diff options
author | vvvv <vvvv@ydb.tech> | 2023-08-29 13:38:13 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-08-29 14:04:53 +0300 |
commit | 0c53fce10d3b30ce19c626f8b97be491f17f0dfe (patch) | |
tree | 00dec69bd4f07cc6de8b4c6203a032b93f159690 | |
parent | 0e130abb0980cee82bd511ac6f197a350e22c3a1 (diff) | |
download | ydb-0c53fce10d3b30ce19c626f8b97be491f17f0dfe.tar.gz |
Moved dqrun
53 files changed, 4417 insertions, 26 deletions
diff --git a/ydb/library/yql/providers/dq/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..fae483c5c08 --- /dev/null +++ b/ydb/library/yql/providers/dq/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(actors) +add_subdirectory(api) +add_subdirectory(common) +add_subdirectory(config) +add_subdirectory(counters) +add_subdirectory(expr_nodes) +add_subdirectory(interface) +add_subdirectory(local_gateway) +add_subdirectory(mkql) +add_subdirectory(opt) +add_subdirectory(planner) +add_subdirectory(provider) +add_subdirectory(runtime) +add_subdirectory(service) +add_subdirectory(stats_collector) +add_subdirectory(task_runner) +add_subdirectory(task_runner_actor) +add_subdirectory(worker_manager) diff --git a/ydb/library/yql/providers/dq/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..fae483c5c08 --- /dev/null +++ b/ydb/library/yql/providers/dq/CMakeLists.linux-aarch64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(actors) +add_subdirectory(api) +add_subdirectory(common) +add_subdirectory(config) +add_subdirectory(counters) +add_subdirectory(expr_nodes) +add_subdirectory(interface) +add_subdirectory(local_gateway) +add_subdirectory(mkql) +add_subdirectory(opt) +add_subdirectory(planner) +add_subdirectory(provider) +add_subdirectory(runtime) +add_subdirectory(service) +add_subdirectory(stats_collector) +add_subdirectory(task_runner) +add_subdirectory(task_runner_actor) +add_subdirectory(worker_manager) diff --git a/ydb/library/yql/providers/dq/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..fae483c5c08 --- /dev/null +++ b/ydb/library/yql/providers/dq/CMakeLists.linux-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(actors) +add_subdirectory(api) +add_subdirectory(common) +add_subdirectory(config) +add_subdirectory(counters) +add_subdirectory(expr_nodes) +add_subdirectory(interface) +add_subdirectory(local_gateway) +add_subdirectory(mkql) +add_subdirectory(opt) +add_subdirectory(planner) +add_subdirectory(provider) +add_subdirectory(runtime) +add_subdirectory(service) +add_subdirectory(stats_collector) +add_subdirectory(task_runner) +add_subdirectory(task_runner_actor) +add_subdirectory(worker_manager) diff --git a/ydb/library/yql/providers/dq/CMakeLists.txt b/ydb/library/yql/providers/dq/CMakeLists.txt index 8a270ca4455..f8b31df0c11 100644 --- a/ydb/library/yql/providers/dq/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/CMakeLists.txt @@ -6,18 +6,12 @@ # original buildsystem will not be accepted. -add_subdirectory(actors) -add_subdirectory(api) -add_subdirectory(common) -add_subdirectory(config) -add_subdirectory(counters) -add_subdirectory(expr_nodes) -add_subdirectory(interface) -add_subdirectory(mkql) -add_subdirectory(opt) -add_subdirectory(planner) -add_subdirectory(provider) -add_subdirectory(runtime) -add_subdirectory(task_runner) -add_subdirectory(task_runner_actor) -add_subdirectory(worker_manager) +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/providers/dq/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..8a270ca4455 --- /dev/null +++ b/ydb/library/yql/providers/dq/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(actors) +add_subdirectory(api) +add_subdirectory(common) +add_subdirectory(config) +add_subdirectory(counters) +add_subdirectory(expr_nodes) +add_subdirectory(interface) +add_subdirectory(mkql) +add_subdirectory(opt) +add_subdirectory(planner) +add_subdirectory(provider) +add_subdirectory(runtime) +add_subdirectory(task_runner) +add_subdirectory(task_runner_actor) +add_subdirectory(worker_manager) diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..efe17d868c0 --- /dev/null +++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-local_gateway) +target_compile_options(providers-dq-local_gateway PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-local_gateway PUBLIC + contrib-libs-cxxsupp + yutil + library-yql-utils + dq-actors-compute + providers-dq-provider + dq-api-protos + providers-dq-task_runner + providers-dq-worker_manager + providers-dq-service + providers-dq-stats_collector +) +target_sources(providers-dq-local_gateway PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +) diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..88a991c531a --- /dev/null +++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-local_gateway) +target_compile_options(providers-dq-local_gateway PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-local_gateway PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-yql-utils + dq-actors-compute + providers-dq-provider + dq-api-protos + providers-dq-task_runner + providers-dq-worker_manager + providers-dq-service + providers-dq-stats_collector +) +target_sources(providers-dq-local_gateway PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +) diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..88a991c531a --- /dev/null +++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-local_gateway) +target_compile_options(providers-dq-local_gateway PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-local_gateway PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-yql-utils + dq-actors-compute + providers-dq-provider + dq-api-protos + providers-dq-task_runner + providers-dq-worker_manager + providers-dq-service + providers-dq-stats_collector +) +target_sources(providers-dq-local_gateway PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +) diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.txt new file mode 100644 index 00000000000..606ff46b4be --- /dev/null +++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/providers/dq/local_gateway/ya.make b/ydb/library/yql/providers/dq/local_gateway/ya.make new file mode 100644 index 00000000000..4b3afaeacbd --- /dev/null +++ b/ydb/library/yql/providers/dq/local_gateway/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +YQL_LAST_ABI_VERSION() + +SRCS( + yql_dq_gateway_local.cpp +) + +PEERDIR( + ydb/library/yql/utils + ydb/library/yql/dq/actors/compute + ydb/library/yql/providers/dq/provider + ydb/library/yql/providers/dq/api/protos + ydb/library/yql/providers/dq/task_runner + ydb/library/yql/providers/dq/worker_manager + ydb/library/yql/providers/dq/service + ydb/library/yql/providers/dq/stats_collector +) + +END() diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp new file mode 100644 index 00000000000..6c01407c04b --- /dev/null +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -0,0 +1,266 @@ +#include "yql_dq_gateway_local.h" + +#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> +#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h> + +#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h> +#include <ydb/library/yql/providers/dq/service/service_node.h> + +#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h> + +#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h> + +#include <ydb/library/yql/utils/range_walker.h> +#include <ydb/library/yql/utils/bind_in_range.h> + +#include <library/cpp/messagebus/network.h> + +#include <util/system/env.h> +#include <util/generic/size_literals.h> + +namespace NYql { + +using namespace NActors; +using NDqs::MakeWorkerManagerActorID; + +class TLocalServiceHolder { +public: + TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, + TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, + NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads, + IMetricsRegistryPtr metricsRegistry, + const std::function<IActor*(void)>& metricsPusherFactory) + : MetricsRegistry(metricsRegistry + ? metricsRegistry + : CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq)) + ) + { + ui32 nodeId = 1; + + TString hostName; + TString localAddress; + std::tie(hostName, localAddress) = NDqs::GetLocalAddress(); + + NDqs::TServiceNodeConfig config = { + nodeId, + localAddress, + hostName, + static_cast<ui16>(interconnectPort.Addr.GetPort()), + static_cast<ui16>(grpcPort.Addr.GetPort()), + 0, // mbus + interconnectPort.Socket.Get()->Release(), + grpcPort.Socket.Get()->Release(), + 1 + }; + + ServiceNode = MakeHolder<TServiceNode>( + config, + threads, + MetricsRegistry); + + auto patternCache = std::make_shared<NKikimr::NMiniKQL::TComputationPatternLRUCache>(200_MB); + NDqs::TLocalWorkerManagerOptions lwmOptions; + lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, patternCache, true); + lwmOptions.AsyncIoFactory = std::move(asyncIoFactory); + lwmOptions.FunctionRegistry = functionRegistry; + lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); + lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( + [=](const NDq::TDqTaskSettings& task, const NDq::TLogFunc& ) + { + return lwmOptions.Factory->Get(task); + }); + lwmOptions.Counters = NDqs::TWorkerManagerCounters(MetricsRegistry->GetSensors()->GetSubgroup("component", "lwm")); + lwmOptions.DropTaskCountersOnFinish = false; + auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); + + ServiceNode->AddLocalService( + MakeWorkerManagerActorID(nodeId), + TActorSetupCmd(resman, TMailboxType::Simple, 0)); + + auto statsCollector = CreateStatsCollector(1, *ServiceNode->GetSetup(), MetricsRegistry->GetSensors()); + + auto actorSystem = ServiceNode->StartActorSystem(); + if (metricsPusherFactory) { + actorSystem->Register(metricsPusherFactory()); + } + + actorSystem->Register(statsCollector); + + ServiceNode->StartService(dqTaskPreprocessorFactories); + } + + ~TLocalServiceHolder() + { + ServiceNode->Stop(); + } + +private: + IMetricsRegistryPtr MetricsRegistry; + THolder<TServiceNode> ServiceNode; +}; + +class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalImpl> +{ + struct TRequest { + TString SessionId; + NDqs::TPlan Plan; + TVector<TString> Columns; + THashMap<TString, TString> SecureParams; + THashMap<TString, TString> GraphParams; + TDqSettings::TPtr Settings; + IDqGateway::TDqProgressWriter ProgressWriter; + THashMap<TString, TString> ModulesMapping; + bool Discard; + NThreading::TPromise<IDqGateway::TResult> Result; + }; + +public: + TDqGatewayLocalImpl(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway) + : LocalService(std::move(localService)) + , Gateway(gateway) + , DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE")) + { } + + NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) { + return Gateway->OpenSession(sessionId, username); + } + + void CloseSession(const TString& sessionId) { + return Gateway->CloseSession(sessionId); + } + + NThreading::TFuture<IDqGateway::TResult> + ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, + const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, + const TDqSettings::TPtr& settings, + const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, + bool discard) + { + + NThreading::TFuture<IDqGateway::TResult> result; + { + TGuard<TMutex> lock(Mutex); + Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>()}); + result = Queue.back().Result; + } + + TryExecuteNext(); + + return result; + } + +private: + void TryExecuteNext() { + TGuard<TMutex> lock(Mutex); + if (!Queue.empty() && (!DeterministicMode || Inflight == 0)) { + auto request = std::move(Queue.front()); Queue.pop_front(); + Inflight++; + lock.Release(); + + auto weak = weak_from_this(); + + Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard) + .Apply([promise=request.Result, weak](const NThreading::TFuture<IDqGateway::TResult>& result) mutable { + try { + promise.SetValue(result.GetValue()); + } catch (...) { + promise.SetException(std::current_exception()); + } + + if (auto ptr = weak.lock()) { + { + TGuard<TMutex> lock(ptr->Mutex); + ptr->Inflight--; + } + + ptr->TryExecuteNext(); + } + }); + } + } + + THolder<TLocalServiceHolder> LocalService; + IDqGateway::TPtr Gateway; + const bool DeterministicMode; + TMutex Mutex; + TList<TRequest> Queue; + int Inflight = 0; +}; + +class TDqGatewayLocal : public IDqGateway { +public: + TDqGatewayLocal(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway) + : Impl(std::make_shared<TDqGatewayLocalImpl>(std::move(localService), gateway)) + {} + + NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override { + return Impl->OpenSession(sessionId, username); + } + + void CloseSession(const TString& sessionId) override { + return Impl->CloseSession(sessionId); + } + + NThreading::TFuture<TResult> + ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, + const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, + const TDqSettings::TPtr& settings, + const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, + bool discard) override + { + return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, + settings, progressWriter, modulesMapping, discard); + } + +private: + std::shared_ptr<TDqGatewayLocalImpl> Impl; +}; + +THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + NKikimr::NMiniKQL::TComputationNodeFactory compFactory, + TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, + NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, + NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads, + IMetricsRegistryPtr metricsRegistry, + const std::function<IActor*(void)>& metricsPusherFactory) +{ + return MakeHolder<TLocalServiceHolder>(functionRegistry, + compFactory, + taskTransformFactory, + dqTaskPreprocessorFactories, + interconnectPort, + grpcPort, + std::move(asyncIoFactory), + threads, + metricsRegistry, + metricsPusherFactory); +} + +TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + NKikimr::NMiniKQL::TComputationNodeFactory compFactory, + TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, + NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads, + IMetricsRegistryPtr metricsRegistry, + const std::function<IActor*(void)>& metricsPusherFactory) +{ + int startPort = 31337; + TRangeWalker<int> portWalker(startPort, startPort+100); + auto interconnectPort = BindInRange(portWalker)[1]; + auto grpcPort = BindInRange(portWalker)[1]; + + return new TDqGatewayLocal( + CreateLocalServiceHolder( + functionRegistry, + compFactory, + taskTransformFactory, + dqTaskPreprocessorFactories, + interconnectPort, + grpcPort, + std::move(asyncIoFactory), + threads, + metricsRegistry, + metricsPusherFactory), + CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort())); +} + +} // namespace NYql diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h new file mode 100644 index 00000000000..b558fbeb7dd --- /dev/null +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h @@ -0,0 +1,21 @@ +#pragma once + +#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> +#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> +#include <ydb/library/yql/providers/common/metrics/metrics_registry.h> + +namespace NActors { +class IActor; +} + +namespace NYql { + +TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + NKikimr::NMiniKQL::TComputationNodeFactory compFactory, + TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, + NDq::IDqAsyncIoFactory::TPtr = nullptr, int threads = 16, + IMetricsRegistryPtr metricsRegistry = {}, + const std::function<NActors::IActor*(void)>& metricsPusherFactory = {}); + +} // namespace NYql diff --git a/ydb/library/yql/providers/dq/service/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/service/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..b1532b4f2eb --- /dev/null +++ b/ydb/library/yql/providers/dq/service/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,41 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-service) +target_compile_options(providers-dq-service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-service PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + library-cpp-build_info + cpp-grpc-server + grpc-server-actors + library-cpp-svnversion + cpp-threading-future + library-yql-sql + api-protos + providers-common-metrics + providers-dq-actors + dq-api-grpc + providers-dq-common + providers-dq-counters + providers-dq-interface + providers-dq-worker_manager + dq-worker_manager-interface +) +target_sources(providers-dq-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/service_node.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp +) diff --git a/ydb/library/yql/providers/dq/service/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/service/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..934db13e595 --- /dev/null +++ b/ydb/library/yql/providers/dq/service/CMakeLists.linux-aarch64.txt @@ -0,0 +1,42 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-service) +target_compile_options(providers-dq-service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + library-cpp-build_info + cpp-grpc-server + grpc-server-actors + library-cpp-svnversion + cpp-threading-future + library-yql-sql + api-protos + providers-common-metrics + providers-dq-actors + dq-api-grpc + providers-dq-common + providers-dq-counters + providers-dq-interface + providers-dq-worker_manager + dq-worker_manager-interface +) +target_sources(providers-dq-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/service_node.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp +) diff --git a/ydb/library/yql/providers/dq/service/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/service/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..934db13e595 --- /dev/null +++ b/ydb/library/yql/providers/dq/service/CMakeLists.linux-x86_64.txt @@ -0,0 +1,42 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-service) +target_compile_options(providers-dq-service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + library-cpp-build_info + cpp-grpc-server + grpc-server-actors + library-cpp-svnversion + cpp-threading-future + library-yql-sql + api-protos + providers-common-metrics + providers-dq-actors + dq-api-grpc + providers-dq-common + providers-dq-counters + providers-dq-interface + providers-dq-worker_manager + dq-worker_manager-interface +) +target_sources(providers-dq-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/service_node.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp +) diff --git a/ydb/library/yql/providers/dq/service/CMakeLists.txt b/ydb/library/yql/providers/dq/service/CMakeLists.txt new file mode 100644 index 00000000000..606ff46b4be --- /dev/null +++ b/ydb/library/yql/providers/dq/service/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp new file mode 100644 index 00000000000..cfe6b53f0b7 --- /dev/null +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -0,0 +1,839 @@ +#include "grpc_service.h" + +#include <ydb/library/yql/utils/log/log.h> + +#include <ydb/library/yql/providers/dq/actors/actor_helpers.h> +#include <ydb/library/yql/providers/dq/actors/executer_actor.h> +#include <ydb/library/yql/providers/dq/worker_manager/interface/events.h> +#include <ydb/library/yql/providers/dq/actors/execution_helpers.h> +#include <ydb/library/yql/providers/dq/actors/result_aggregator.h> +#include <ydb/library/yql/providers/dq/actors/events.h> +#include <ydb/library/yql/providers/dq/actors/task_controller.h> +#include <ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h> + +#include <ydb/library/yql/providers/dq/counters/counters.h> +#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> +#include <ydb/library/yql/providers/dq/common/yql_dq_common.h> + +//#include <yql/tools/yqlworker/dq/worker_manager/benchmark.h> + +#include <ydb/library/yql/public/issue/yql_issue_message.h> + +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> + +#include <library/cpp/grpc/server/grpc_counters.h> +#include <ydb/public/api/protos/ydb_status_codes.pb.h> + +#include <library/cpp/actors/interconnect/interconnect.h> +#include <library/cpp/actors/helpers/future_callback.h> +#include <library/cpp/build_info/build_info.h> +#include <library/cpp/svnversion/svnversion.h> + +#include <util/string/split.h> +#include <util/system/env.h> + +namespace NYql::NDqs { + using namespace NYql::NDqs; + using namespace NKikimr; + using namespace NThreading; + using namespace NMonitoring; + using namespace NActors; + + namespace { + NGrpc::ICounterBlockPtr BuildCB(TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, const TString& name) { + auto grpcCB = counters->GetSubgroup("rpc_name", name); + return MakeIntrusive<NGrpc::TCounterBlock>( + grpcCB->GetCounter("total", true), + grpcCB->GetCounter("infly", true), + grpcCB->GetCounter("notOkReq", true), + grpcCB->GetCounter("notOkResp", true), + grpcCB->GetCounter("reqBytes", true), + grpcCB->GetCounter("inflyReqBytes", true), + grpcCB->GetCounter("resBytes", true), + grpcCB->GetCounter("notAuth", true), + grpcCB->GetCounter("resExh", true), + grpcCB); + } + + template<typename RequestType, typename ResponseType> + class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> { + public: + static constexpr char ActorName[] = "SERVICE_PROXY"; + static constexpr char RetryName[] = "OperationRetry"; + + explicit TServiceProxyActor( + NGrpc::IRequestContextBase* ctx, + const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, + const TString& traceId, const TString& username) + : TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>>(&TServiceProxyActor::Handler) + , Ctx(ctx) + , Counters(counters) + , ServiceProxyActorCounters(counters->GetSubgroup("component", "ServiceProxyActor")) + , ClientDisconnectedCounter(ServiceProxyActorCounters->GetCounter("ClientDisconnected", /*derivative=*/ true)) + , RetryCounter(ServiceProxyActorCounters->GetCounter(RetryName, /*derivative=*/ true)) + , FallbackCounter(ServiceProxyActorCounters->GetCounter("Fallback", /*derivative=*/ true)) + , ErrorCounter(ServiceProxyActorCounters->GetCounter("UnrecoverableError", /*derivative=*/ true)) + , Request(dynamic_cast<const RequestType*>(ctx->GetRequest())) + , TraceId(traceId) + , Username(username) + , Promise(NewPromise<void>()) + { + Settings->Dispatch(Request->GetSettings()); + Settings->FreezeDefaults(); + + MaxRetries = Settings->MaxRetries.Get().GetOrElse(MaxRetries); + RetryBackoff = TDuration::MilliSeconds(Settings->RetryBackoffMs.Get().GetOrElse(RetryBackoff.MilliSeconds())); + } + + STRICT_STFUNC(Handler, { + HFunc(TEvQueryResponse, OnReturnResult); + cFunc(TEvents::TEvPoison::EventType, OnPoison); + SFunc(TEvents::TEvBootstrap, DoBootstrap); + hFunc(TEvDqStats, Handle); + }) + + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override { + return new IEventHandle(self, parentId, new TEvents::TEvBootstrap(), 0); + } + + void OnPoison() { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ ; + ReplyError(grpc::UNAVAILABLE, "Unexpected error"); + *ClientDisconnectedCounter += 1; + } + + void Handle(TEvDqStats::TPtr&) { + // Do nothing + } + + void DoPassAway() override { + Promise.SetValue(); + } + + void DoBootstrap(const NActors::TActorContext& ctx) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + if (!CtxSubscribed) { + auto selfId = ctx.SelfID; + auto* actorSystem = ctx.ExecutorThread.ActorSystem; + Ctx->GetFinishFuture().Subscribe([selfId, actorSystem](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) { + Y_VERIFY(future.HasValue()); + if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL) { + actorSystem->Send(selfId, new TEvents::TEvPoison()); + } + }); + CtxSubscribed = true; + } + Bootstrap(); + } + + virtual void Bootstrap() = 0; + + void SendResponse(TEvQueryResponse::TPtr& ev) + { + auto& result = ev->Get()->Record; + Yql::DqsProto::ExecuteQueryResult queryResult; + queryResult.Mutableresult()->CopyFrom(result.resultset()); + queryResult.set_yson(result.yson()); + + auto statusCode = result.GetStatusCode(); + // this code guarantees that query will be considered failed unless the status is SUCCESS + // fallback may be performed as an extra measure + if (statusCode != NYql::NDqProto::StatusIds::SUCCESS) { + YQL_CLOG(ERROR, ProviderDq) << "Query is considered FAILED, status=" << static_cast<int>(statusCode); + NYql::TIssue rootIssue("Fatal Error"); + rootIssue.SetCode(NCommon::NeedFallback(statusCode) ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR); + NYql::TIssues issues; + NYql::IssuesFromMessage(result.GetIssues(), issues); + for (const auto& issue: issues) { + rootIssue.AddSubIssue(MakeIntrusive<TIssue>(issue)); + } + result.MutableIssues()->Clear(); + NYql::IssuesToMessage({rootIssue}, result.MutableIssues()); + } + + for (const auto& [k, v] : QueryStat.Get()) { + std::map<TString, TString> labels; + TString prefix, name; + if (NCommon::ParseCounterName(&prefix, &labels, &name, k)) { + if (prefix == "Actor") { + auto group = Counters->GetSubgroup("counters", "Actor"); + for (const auto& [k, v] : labels) { + group = group->GetSubgroup(k, v); + } + group->GetHistogram(name, ExponentialHistogram(10, 2, 50))->Collect(v.Sum); + } + } + } + + if (Settings->AggregateStatsByStage.Get().GetOrElse(TDqSettings::TDefault::AggregateStatsByStage)) { + auto aggregatedQueryStat = AggregateQueryStatsByStage(QueryStat, Task2Stage); + aggregatedQueryStat.FlushCounters(ResponseBuffer); + } else { + QueryStat.FlushCounters(ResponseBuffer); + } + + auto& operation = *ResponseBuffer.mutable_operation(); + operation.Setready(true); + operation.Mutableresult()->PackFrom(queryResult); + *operation.Mutableissues() = result.GetIssues(); + ResponseBuffer.SetTruncated(result.GetTruncated()); + + Reply(Ydb::StatusIds::SUCCESS, statusCode > 1 || result.GetIssues().size() > 0); + } + + virtual void DoRetry() + { + this->CleanupChildren(); + auto selfId = this->SelfId(); + TActivationContext::Schedule(RetryBackoff, new IEventHandle(selfId, selfId, new TEvents::TEvBootstrap(), 0)); + Retry += 1; + *RetryCounter +=1 ; + } + + void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) { + auto& result = ev->Get()->Record; + Y_UNUSED(ctx); + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size(); + QueryStat.AddCounters(result); + + auto statusCode = result.GetStatusCode(); + if ((statusCode != NYql::NDqProto::StatusIds::SUCCESS || result.GetIssues().size() > 0) && NCommon::IsRetriable(statusCode)) { + if (Retry < MaxRetries) { + QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0)); + NYql::TIssues issues; + NYql::IssuesFromMessage(result.GetIssues(), issues); + YQL_CLOG(WARN, ProviderDq) << RetryName << " " << Retry << " Issues: " << issues.ToString(); + DoRetry(); + } else { + YQL_CLOG(ERROR, ProviderDq) << "Retries limit exceeded, status= " << static_cast<int>(ev->Get()->Record.GetStatusCode()); + if (statusCode == NYql::NDqProto::StatusIds::SUCCESS) { + ev->Get()->Record.SetStatusCode(NYql::NDqProto::StatusIds::INTERNAL_ERROR); + } + SendResponse(ev); + } + } else { + SendResponse(ev); + } + } + + TFuture<void> GetFuture() { + return Promise.GetFuture(); + } + + virtual void ReplyError(grpc::StatusCode code, const TString& msg) { + Ctx->ReplyError(code, msg); + this->PassAway(); + } + + virtual void Reply(ui32 status, bool hasIssues) { + Y_UNUSED(hasIssues); + Ctx->Reply(&ResponseBuffer, status); + this->PassAway(); + } + + private: + NGrpc::IRequestContextBase* Ctx; + bool CtxSubscribed = false; + ResponseType ResponseBuffer; + + protected: + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; + TIntrusivePtr<NMonitoring::TDynamicCounters> ServiceProxyActorCounters; + TDynamicCounters::TCounterPtr ClientDisconnectedCounter; + TDynamicCounters::TCounterPtr RetryCounter; + TDynamicCounters::TCounterPtr FallbackCounter; + TDynamicCounters::TCounterPtr ErrorCounter; + + const RequestType* Request; + const TString TraceId; + const TString Username; + TPromise<void> Promise; + const TInstant RequestStartTime = TInstant::Now(); + + TDqConfiguration::TPtr Settings = MakeIntrusive<TDqConfiguration>(); + + int Retry = 0; + int MaxRetries = 10; + TDuration RetryBackoff = TDuration::MilliSeconds(1000); + + NYql::TCounters QueryStat; + THashMap<ui64, ui64> Task2Stage; + + void RestoreRequest() { + Request = dynamic_cast<const RequestType*>(Ctx->GetRequest()); + } + }; + + class TExecuteGraphProxyActor: public TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse> { + public: + using TBase = TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse>; + TExecuteGraphProxyActor(NGrpc::IRequestContextBase* ctx, + const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, + const TString& traceId, const TString& username, + const NActors::TActorId& graphExecutionEventsActorId) + : TServiceProxyActor(ctx, counters, traceId, username) + , GraphExecutionEventsActorId(graphExecutionEventsActorId) + { + } + + void DoRetry() override { + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; + SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) { + if (ev->Get()->Record.GetErrorMessage()) { + TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage()); + } else { + RestoreRequest(); + ModifiedRequest.Reset(); + TBase::DoRetry(); + } + }); + } + + void Reply(ui32 status, bool hasIssues) override { + auto eventType = hasIssues + ? NYql::NDqProto::EGraphExecutionEventType::FAIL + : NYql::NDqProto::EGraphExecutionEventType::SUCCESS; + SendEvent(eventType, nullptr, [this, status, hasIssues](const auto& ev) { + if (!hasIssues && ev->Get()->Record.GetErrorMessage()) { + TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage()); + } else { + TBase::Reply(status, hasIssues); + } + }); + } + + void ReplyError(grpc::StatusCode code, const TString& msg) override { + SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this, code, msg](const auto& ev) { + Y_UNUSED(ev); + TBase::ReplyError(code, msg); + }); + } + + private: + THolder<Yql::DqsProto::ExecuteGraphRequest> ModifiedRequest; + + void DoPassAway() override { + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; + Send(GraphExecutionEventsActorId, new TEvents::TEvPoison()); + TServiceProxyActor::DoPassAway(); + } + + NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor SerializeGraphDescriptor() const { + NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor result; + + for (const auto& kv : Request->GetSecureParams()) { + result.MutableSecureParams()->MutableData()->insert(kv); + } + + for (const auto& kv : Request->GetGraphParams()) { + result.MutableGraphParams()->MutableData()->insert(kv); + } + + return result; + } + + void Bootstrap() override { + YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnExecuteGraph"; + + SendEvent(NYql::NDqProto::EGraphExecutionEventType::START, SerializeGraphDescriptor(), [this](const TEvGraphExecutionEvent::TPtr& ev) { + if (ev->Get()->Record.GetErrorMessage()) { + TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage()); + } else { + NDqProto::TGraphExecutionEvent::TMap params; + ev->Get()->Record.GetMessage().UnpackTo(¶ms); + FinishBootstrap(params); + } + }); + } + + void MergeTaskMetas(const NDqProto::TGraphExecutionEvent::TMap& params) { + if (!params.data().empty()) { + for (size_t i = 0; i < Request->TaskSize(); ++i) { + if (!ModifiedRequest) { + ModifiedRequest.Reset(new Yql::DqsProto::ExecuteGraphRequest()); + ModifiedRequest->CopyFrom(*Request); + } + + auto* task = ModifiedRequest->MutableTask(i); + + Yql::DqsProto::TTaskMeta taskMeta; + task->GetMeta().UnpackTo(&taskMeta); + + for (const auto&[key, value] : params.data()) { + (*taskMeta.MutableTaskParams())[key] = value; + } + + task->MutableMeta()->PackFrom(taskMeta); + } + } + + if (ModifiedRequest) { + Request = ModifiedRequest.Get(); + } + } + + void FinishBootstrap(const NDqProto::TGraphExecutionEvent::TMap& params) { + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; + MergeTaskMetas(params); + + auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime)); + + TVector<TString> columns; + columns.reserve(Request->GetColumns().size()); + for (const auto& column : Request->GetColumns()) { + columns.push_back(column); + } + for (const auto& task : Request->GetTask()) { + Yql::DqsProto::TTaskMeta taskMeta; + task.GetMeta().UnpackTo(&taskMeta); + Task2Stage[task.GetId()] = taskMeta.GetStageId(); + } + THashMap<TString, TString> secureParams; + for (const auto& x : Request->GetSecureParams()) { + secureParams[x.first] = x.second; + } + auto resultId = RegisterChild(NExecutionHelpers::MakeResultAggregator( + columns, + executerId, + TraceId, + secureParams, + Settings, + Request->GetResultType(), + Request->GetDiscard(), + GraphExecutionEventsActorId).Release()); + auto controlId = Settings->EnableComputeActor.Get().GetOrElse(false) == false ? resultId + : RegisterChild(NYql::MakeTaskController(TraceId, executerId, resultId, Settings, NYql::NCommon::TServiceCounters(Counters, nullptr, "")).Release()); + Send(executerId, MakeHolder<TEvGraphRequest>( + *Request, + controlId, + resultId)); + } + + template <class TPayload, class TCallback> + void SendEvent(NYql::NDqProto::EGraphExecutionEventType eventType, const TPayload& payload, TCallback callback) { + NDqProto::TGraphExecutionEvent record; + record.SetEventType(eventType); + if constexpr (!std::is_same_v<TPayload, std::nullptr_t>) { + record.MutableMessage()->PackFrom(payload); + } + Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record)); + Synchronize<TEvGraphExecutionEvent>([callback, traceId = TraceId](TEvGraphExecutionEvent::TPtr& ev) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(traceId); + Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC); + callback(ev); + }); + } + + NActors::TActorId GraphExecutionEventsActorId; + }; + + TString GetVersionString() { + TStringBuilder sb; + sb << GetProgramSvnVersion() << "\n"; + sb << GetBuildInfo(); + TString sandboxTaskId = GetSandboxTaskId(); + if (sandboxTaskId != TString("0")) { + sb << "\nSandbox task id: " << sandboxTaskId; + } + + return sb; + } + } + + TDqsGrpcService::TDqsGrpcService( + NActors::TActorSystem& system, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) + : ActorSystem(system) + , Counters(std::move(counters)) + , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories) + , Promise(NewPromise<void>()) + , RunningRequests(0) + , Stopping(false) + , Sessions(&system, Counters->GetSubgroup("component", "grpc")->GetCounter("Sessions")) + { } + +#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ + do { \ + MakeIntrusive<NGrpc::TGRpcRequest<Yql::DqsProto::IN, Yql::DqsProto::OUT, TDqsGrpcService>>( \ + this, \ + &Service_, \ + CQ, \ + [this](NGrpc::IRequestContextBase* ctx) { ACTION }, \ + &Yql::DqsProto::DqService::AsyncService::Request##NAME, \ + #NAME, \ + logger, \ + BuildCB(Counters, #NAME)) \ + ->Run(); \ + } while (0) + + void TDqsGrpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { + using namespace google::protobuf; + + CQ = cq; + + using TDqTaskPreprocessorCollection = std::vector<NYql::IDqTaskPreprocessor::TPtr>; + + ADD_REQUEST(ExecuteGraph, ExecuteGraphRequest, ExecuteGraphResponse, { + TGuard<TMutex> lock(Mutex); + if (Stopping) { + ctx->ReplyError(grpc::UNAVAILABLE, "Terminating in progress"); + return; + } + auto* request = dynamic_cast<const Yql::DqsProto::ExecuteGraphRequest*>(ctx->GetRequest()); + auto session = Sessions.GetSession(request->GetSession()); + if (!session) { + TString message = TStringBuilder() + << "Bad session: " + << request->GetSession(); + YQL_CLOG(DEBUG, ProviderDq) << message; + ctx->ReplyError(grpc::INVALID_ARGUMENT, message); + return; + } + + TDqTaskPreprocessorCollection taskPreprocessors; + for (const auto& factory: DqTaskPreprocessorFactories) { + taskPreprocessors.push_back(factory()); + } + + auto graphExecutionEventsActorId = ActorSystem.Register(NDqs::MakeGraphExecutionEventsActor(request->GetSession(), std::move(taskPreprocessors))); + + RunningRequests++; + auto actor = MakeHolder<TExecuteGraphProxyActor>(ctx, Counters, request->GetSession(), session->GetUsername(), graphExecutionEventsActorId); + auto future = actor->GetFuture(); + auto actorId = ActorSystem.Register(actor.Release()); + future.Apply([session, actorId, this] (const TFuture<void>&) mutable { + RunningRequests--; + if (Stopping && !RunningRequests) { + Promise.SetValue(); + } + + session->DeleteRequest(actorId); + }); + session->AddRequest(actorId); + }); + + ADD_REQUEST(SvnRevision, SvnRevisionRequest, SvnRevisionResponse, { + Y_UNUSED(this); + Yql::DqsProto::SvnRevisionResponse result; + result.SetRevision(GetVersionString()); + ctx->Reply(&result, Ydb::StatusIds::SUCCESS); + }); + + ADD_REQUEST(CloseSession, CloseSessionRequest, CloseSessionResponse, { + Y_UNUSED(this); + auto* request = dynamic_cast<const Yql::DqsProto::CloseSessionRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + Yql::DqsProto::CloseSessionResponse result; + Sessions.CloseSession(request->GetSession()); + ctx->Reply(&result, Ydb::StatusIds::SUCCESS); + }); + + ADD_REQUEST(PingSession, PingSessionRequest, PingSessionResponse, { + Y_UNUSED(this); + auto* request = dynamic_cast<const Yql::DqsProto::PingSessionRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + YQL_CLOG(TRACE, ProviderDq) << "PingSession " << request->GetSession(); + + Yql::DqsProto::PingSessionResponse result; + auto session = Sessions.GetSession(request->GetSession()); + if (!session) { + TString message = TStringBuilder() + << "Bad session: " + << request->GetSession(); + YQL_CLOG(DEBUG, ProviderDq) << message; + ctx->ReplyError(grpc::INVALID_ARGUMENT, message); + } else { + ctx->Reply(&result, Ydb::StatusIds::SUCCESS); + } + }); + + ADD_REQUEST(OpenSession, OpenSessionRequest, OpenSessionResponse, { + Y_UNUSED(this); + auto* request = dynamic_cast<const Yql::DqsProto::OpenSessionRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + YQL_CLOG(DEBUG, ProviderDq) << "OpenSession for " << request->GetSession() << " " << request->GetUsername(); + + Yql::DqsProto::OpenSessionResponse result; + if (Sessions.OpenSession(request->GetSession(), request->GetUsername())) { + ctx->Reply(&result, Ydb::StatusIds::SUCCESS); + } else { + ctx->ReplyError(grpc::INVALID_ARGUMENT, "Session `" + request->GetSession() + "' exists"); + } + }); + + ADD_REQUEST(JobStop, JobStopRequest, JobStopResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::JobStopRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + auto ev = MakeHolder<TEvJobStop>(*request); + + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::JobStopResponse>(ctx->GetArena()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + + ActorSystem.Send(MakeWorkerManagerActorID(ActorSystem.NodeId), ev.Release()); + }); + + ADD_REQUEST(ClusterStatus, ClusterStatusRequest, ClusterStatusResponse, { + auto ev = MakeHolder<TEvClusterStatus>(); + + using ResultEv = TEvClusterStatusResponse; + + TExecutorPoolStats poolStats; + + TExecutorThreadStats stat; + TVector<TExecutorThreadStats> stats; + ActorSystem.GetPoolStats(0, poolStats, stats); + + for (const auto& s : stats) { + stat.Aggregate(s); + } + + YQL_CLOG(DEBUG, ProviderDq) << "SentEvents: " << stat.SentEvents; + YQL_CLOG(DEBUG, ProviderDq) << "ReceivedEvents: " << stat.ReceivedEvents; + YQL_CLOG(DEBUG, ProviderDq) << "NonDeliveredEvents: " << stat.NonDeliveredEvents; + YQL_CLOG(DEBUG, ProviderDq) << "EmptyMailboxActivation: " << stat.EmptyMailboxActivation; + Sessions.PrintInfo(); + + for (ui32 i = 0; i < stat.ActorsAliveByActivity.size(); i=i+1) { + if (stat.ActorsAliveByActivity[i]) { + YQL_CLOG(DEBUG, ProviderDq) << "ActorsAliveByActivity[" << i << "]=" << stat.ActorsAliveByActivity[i]; + } + } + + auto callback = MakeHolder<TRichActorFutureCallback<ResultEv>>( + [ctx] (TAutoPtr<TEventHandle<ResultEv>>& event) mutable { + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::ClusterStatusResponse>(ctx->GetArena()); + result->MergeFrom(event->Get()->Record.GetResponse()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + }, + [ctx] () mutable { + YQL_CLOG(INFO, ProviderDq) << "ClusterStatus failed"; + ctx->ReplyError(grpc::UNAVAILABLE, "Error"); + }, + TDuration::MilliSeconds(2000)); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); + }); + + ADD_REQUEST(OperationStop, OperationStopRequest, OperationStopResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::OperationStopRequest*>(ctx->GetRequest()); + auto ev = MakeHolder<TEvOperationStop>(*request); + + auto callback = MakeHolder<TRichActorFutureCallback<TEvOperationStopResponse>>( + [ctx] (TAutoPtr<TEventHandle<TEvOperationStopResponse>>& event) mutable { + Y_UNUSED(event); + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::OperationStopResponse>(ctx->GetArena()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + }, + [ctx] () mutable { + YQL_CLOG(INFO, ProviderDq) << "OperationStopResponse failed"; + ctx->ReplyError(grpc::UNAVAILABLE, "Error"); + }, + TDuration::MilliSeconds(2000)); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); + }); + + ADD_REQUEST(QueryStatus, QueryStatusRequest, QueryStatusResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::QueryStatusRequest*>(ctx->GetRequest()); + + auto ev = MakeHolder<TEvQueryStatus>(*request); + + auto callback = MakeHolder<TRichActorFutureCallback<TEvQueryStatusResponse>>( + [ctx] (TAutoPtr<TEventHandle<TEvQueryStatusResponse>>& event) mutable { + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::QueryStatusResponse>(ctx->GetArena()); + result->MergeFrom(event->Get()->Record.GetResponse()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + }, + [ctx] () mutable { + YQL_CLOG(INFO, ProviderDq) << "QueryStatus failed"; + ctx->ReplyError(grpc::UNAVAILABLE, "Error"); + }, + TDuration::MilliSeconds(2000)); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); + }); + + ADD_REQUEST(RegisterNode, RegisterNodeRequest, RegisterNodeResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::RegisterNodeRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + if (!request->GetPort() + || request->GetRole().empty() + || request->GetAddress().empty() + || request->GetRevision().empty()) + { + ctx->ReplyError(grpc::INVALID_ARGUMENT, "Invalid argument"); + return; + } + + auto ev = MakeHolder<TEvRegisterNode>(*request); + + using ResultEv = TEvRegisterNodeResponse; + + auto callback = MakeHolder<TRichActorFutureCallback<ResultEv>>( + [ctx] (TAutoPtr<TEventHandle<ResultEv>>& event) mutable { + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::RegisterNodeResponse>(ctx->GetArena()); + result->MergeFrom(event->Get()->Record.GetResponse()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + }, + [ctx] () mutable { + YQL_CLOG(INFO, ProviderDq) << "RegisterNode failed"; + ctx->ReplyError(grpc::UNAVAILABLE, "Error"); + }, + TDuration::MilliSeconds(5000)); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); + }); + + ADD_REQUEST(GetMaster, GetMasterRequest, GetMasterResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::GetMasterRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + auto requestEvent = MakeHolder<TEvGetMasterRequest>(); + + auto callback = MakeHolder<TActorFutureCallback<TEvGetMasterResponse>>( + [ctx] (TAutoPtr<TEventHandle<TEvGetMasterResponse>>& event) mutable { + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::GetMasterResponse>(ctx->GetArena()); + result->MergeFrom(event->Get()->Record.GetResponse()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + }); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release())); + }); + + ADD_REQUEST(ConfigureFailureInjector, ConfigureFailureInjectorRequest, ConfigureFailureInjectorResponse,{ + auto* request = dynamic_cast<const Yql::DqsProto::ConfigureFailureInjectorRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + auto requestEvent = MakeHolder<TEvConfigureFailureInjectorRequest>(*request); + + auto callback = MakeHolder<TActorFutureCallback<TEvConfigureFailureInjectorResponse>>( + [ctx] (TAutoPtr<TEventHandle<TEvConfigureFailureInjectorResponse>>& event) mutable { + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::ConfigureFailureInjectorResponse>(ctx->GetArena()); + result->MergeFrom(event->Get()->Record.GetResponse()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + }); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release())); + }); + + ADD_REQUEST(IsReady, IsReadyRequest, IsReadyResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::IsReadyRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + auto ev = MakeHolder<TEvIsReady>(*request); + + auto callback = MakeHolder<TRichActorFutureCallback<TEvIsReadyResponse>>( + [ctx] (TAutoPtr<TEventHandle<TEvIsReadyResponse>>& event) mutable { + Yql::DqsProto::IsReadyResponse result; + result.SetIsReady(event->Get()->Record.GetIsReady()); + ctx->Reply(&result, Ydb::StatusIds::SUCCESS); + }, + [ctx] () mutable { + YQL_CLOG(INFO, ProviderDq) << "IsReadyForRevision failed"; + ctx->ReplyError(grpc::UNAVAILABLE, "Error"); + }, + TDuration::MilliSeconds(2000)); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release())); + }); + + ADD_REQUEST(Routes, RoutesRequest, RoutesResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::RoutesRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + auto ev = MakeHolder<TEvRoutesRequest>(); + + auto callback = MakeHolder<TRichActorFutureCallback<TEvRoutesResponse>>( + [ctx] (TAutoPtr<TEventHandle<TEvRoutesResponse>>& event) mutable { + Yql::DqsProto::RoutesResponse result; + result.MergeFrom(event->Get()->Record.GetResponse()); + ctx->Reply(&result, Ydb::StatusIds::SUCCESS); + }, + [ctx] () mutable { + YQL_CLOG(INFO, ProviderDq) << "Routes failed"; + ctx->ReplyError(grpc::UNAVAILABLE, "Error"); + }, + TDuration::MilliSeconds(5000)); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(request->GetNodeId()), callbackId, ev.Release())); + }); + +/* 1. move grpc to providers/dq, 2. move benchmark to providers/dq 3. uncomment + ADD_REQUEST(Benchmark, BenchmarkRequest, BenchmarkResponse, { + auto* req = dynamic_cast<const Yql::DqsProto::BenchmarkRequest*>(ctx->GetRequest()); + Y_VERIFY(!!req); + + TWorkerManagerBenchmarkOptions options; + if (req->GetWorkerCount()) { + options.WorkerCount = req->GetWorkerCount(); + } + if (req->GetInflight()) { + options.Inflight = req->GetInflight(); + } + if (req->GetTotalRequests()) { + options.TotalRequests = req->GetTotalRequests(); + } + if (req->GetMaxRunTimeMs()) { + options.MaxRunTimeMs = TDuration::MilliSeconds(req->GetMaxRunTimeMs()); + } + + auto benchmarkId = ActorSystem.Register( + CreateWorkerManagerBenchmark( + MakeWorkerManagerActorID(ActorSystem.NodeId), options + )); + + ActorSystem.Send(benchmarkId, new TEvents::TEvBootstrap); + + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::BenchmarkResponse>(ctx->GetArena()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + }); +*/ + } + + void TDqsGrpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { + Limiter = limiter; + } + + bool TDqsGrpcService::IncRequest() { + return Limiter->Inc(); + } + + void TDqsGrpcService::DecRequest() { + Limiter->Dec(); + Y_ASSERT(Limiter->GetCurrentInFlight() >= 0); + } + + TFuture<void> TDqsGrpcService::Stop() { + TGuard<TMutex> lock(Mutex); + Stopping = true; + auto future = Promise.GetFuture(); + if (RunningRequests == 0) { + Promise.SetValue(); + } + return future; + } +} diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h new file mode 100644 index 00000000000..fa2a835c8c6 --- /dev/null +++ b/ydb/library/yql/providers/dq/service/grpc_service.h @@ -0,0 +1,52 @@ +#pragma once + +#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> + +#include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h> +#include <ydb/library/yql/providers/dq/api/protos/service.pb.h> + +#include <ydb/library/yql/minikql/mkql_function_registry.h> + +#include <library/cpp/grpc/server/grpc_request.h> +#include <library/cpp/grpc/server/grpc_server.h> + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/threading/future/future.h> + +#include "grpc_session.h" + +namespace NYql::NDqs { + class TDatabaseManager; + + class TDqsGrpcService: public NGrpc::TGrpcServiceBase<Yql::DqsProto::DqService> { + public: + TDqsGrpcService(NActors::TActorSystem& system, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); + + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; + void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + + bool IncRequest(); + void DecRequest(); + + NThreading::TFuture<void> Stop(); + + private: + NActors::TActorSystem& ActorSystem; + grpc::ServerCompletionQueue* CQ = nullptr; + NGrpc::TGlobalLimiter* Limiter = nullptr; + + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; + TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories; + TMutex Mutex; + NThreading::TPromise<void> Promise; + std::atomic<ui64> RunningRequests; + std::atomic<bool> Stopping; + + TSessionStorage Sessions; + }; +} diff --git a/ydb/library/yql/providers/dq/service/grpc_session.cpp b/ydb/library/yql/providers/dq/service/grpc_session.cpp new file mode 100644 index 00000000000..2de87f9d46a --- /dev/null +++ b/ydb/library/yql/providers/dq/service/grpc_session.cpp @@ -0,0 +1,106 @@ +#include "grpc_session.h" + +#include <ydb/library/yql/utils/log/log.h> + +namespace NYql::NDqs { + +TSession::~TSession() { + TGuard<TMutex> lock(Mutex); + for (auto actorId : Requests) { + ActorSystem->Send(actorId, new NActors::TEvents::TEvPoison()); + } +} + +void TSession::DeleteRequest(const NActors::TActorId& actorId) +{ + TGuard<TMutex> lock(Mutex); + Requests.erase(actorId); +} + +void TSession::AddRequest(const NActors::TActorId& actorId) +{ + TGuard<TMutex> lock(Mutex); + Requests.insert(actorId); +} + +TSessionStorage::TSessionStorage( + NActors::TActorSystem* actorSystem, + const NMonitoring::TDynamicCounters::TCounterPtr& sessionsCounter) + : ActorSystem(actorSystem) + , SessionsCounter(sessionsCounter) +{ } + +void TSessionStorage::CloseSession(const TString& sessionId) +{ + TGuard<TMutex> lock(SessionMutex); + auto it = Sessions.find(sessionId); + if (it == Sessions.end()) { + return; + } + SessionsByLastUpdate.erase(it->second.Iterator); + Sessions.erase(it); + *SessionsCounter = Sessions.size(); +} + +std::shared_ptr<TSession> TSessionStorage::GetSession(const TString& sessionId) +{ + Clean(TInstant::Now() - TDuration::Minutes(10)); + + TGuard<TMutex> lock(SessionMutex); + auto it = Sessions.find(sessionId); + if (it == Sessions.end()) { + return std::shared_ptr<TSession>(); + } else { + SessionsByLastUpdate.erase(it->second.Iterator); + SessionsByLastUpdate.push_back({TInstant::Now(), sessionId}); + it->second.Iterator = SessionsByLastUpdate.end(); + it->second.Iterator--; + return it->second.Session; + } +} + +bool TSessionStorage::OpenSession(const TString& sessionId, const TString& username) +{ + TGuard<TMutex> lock(SessionMutex); + if (Sessions.contains(sessionId)) { + return false; + } + + SessionsByLastUpdate.push_back({TInstant::Now(), sessionId}); + auto it = SessionsByLastUpdate.end(); --it; + + Sessions[sessionId] = TSessionAndIterator { + std::make_shared<TSession>(username, ActorSystem), + it + }; + + *SessionsCounter = Sessions.size(); + + return true; +} + +void TSessionStorage::Clean(TInstant before) { + TGuard<TMutex> lock(SessionMutex); + for (TSessionsByLastUpdate::iterator it = SessionsByLastUpdate.begin(); + it != SessionsByLastUpdate.end(); ) + { + if (it->LastUpdate < before) { + YQL_CLOG(INFO, ProviderDq) << "Drop session by timeout " << it->SessionId; + Sessions.erase(it->SessionId); + it = SessionsByLastUpdate.erase(it); + } else { + break; + } + } + + *SessionsCounter = Sessions.size(); +} + +void TSessionStorage::PrintInfo() const { + YQL_CLOG(INFO, ProviderDq) << "SessionsByLastUpdate: " << SessionsByLastUpdate.size(); + YQL_CLOG(DEBUG, ProviderDq) << "Sessions: " << Sessions.size(); + ui64 currenSessionsCounter = *SessionsCounter; + YQL_CLOG(DEBUG, ProviderDq) << "SessionsCounter: " << currenSessionsCounter; +} + +} // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/service/grpc_session.h b/ydb/library/yql/providers/dq/service/grpc_session.h new file mode 100644 index 00000000000..56e592a757d --- /dev/null +++ b/ydb/library/yql/providers/dq/service/grpc_session.h @@ -0,0 +1,57 @@ +#pragma once +#include <library/cpp/actors/core/actorsystem.h> + +namespace NYql::NDqs { + +class TSession { +public: + TSession(const TString& username, NActors::TActorSystem* actorSystem) + : Username(username) + , ActorSystem(actorSystem) + { } + + ~TSession(); + + const TString& GetUsername() const { + return Username; + } + + void DeleteRequest(const NActors::TActorId& id); + void AddRequest(const NActors::TActorId& id); + +private: + TMutex Mutex; + const TString Username; + NActors::TActorSystem* ActorSystem; + THashSet<NActors::TActorId> Requests; +}; + +class TSessionStorage { +public: + TSessionStorage( + NActors::TActorSystem* actorSystem, + const NMonitoring::TDynamicCounters::TCounterPtr& sessionsCounter); + void CloseSession(const TString& sessionId); + std::shared_ptr<TSession> GetSession(const TString& sessionId); + bool OpenSession(const TString& sessionId, const TString& username); + void Clean(TInstant before); + void PrintInfo() const; + +private: + NActors::TActorSystem* ActorSystem; + NMonitoring::TDynamicCounters::TCounterPtr SessionsCounter; + TMutex SessionMutex; + struct TTimeAndSessionId { + TInstant LastUpdate; + TString SessionId; + }; + using TSessionsByLastUpdate = TList<TTimeAndSessionId>; + TSessionsByLastUpdate SessionsByLastUpdate; + struct TSessionAndIterator { + std::shared_ptr<TSession> Session; + TSessionsByLastUpdate::iterator Iterator; + }; + THashMap<TString, TSessionAndIterator> Sessions; +}; + +} // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp new file mode 100644 index 00000000000..e555d0051dc --- /dev/null +++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp @@ -0,0 +1,302 @@ +#include "interconnect_helpers.h" +#include "service_node.h" + +#include "grpc_service.h" + +#include <library/cpp/actors/helpers/selfping_actor.h> + +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/backtrace/backtrace.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> + +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/actors/core/scheduler_actor.h> +#include <library/cpp/actors/dnsresolver/dnsresolver.h> +#include <library/cpp/actors/interconnect/interconnect.h> +#include <library/cpp/actors/interconnect/interconnect_common.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_server.h> +#include <library/cpp/actors/interconnect/poller_actor.h> +#include <library/cpp/yson/node/node_io.h> + +#include <util/stream/file.h> +#include <util/system/env.h> + +namespace NYql::NDqs { + using namespace NActors; + using namespace NActors::NDnsResolver; + using namespace NGrpc; + + class TYqlLogBackend: public TLogBackend { + void WriteData(const TLogRecord& rec) override { + TString message(rec.Data, rec.Len); + if (message.find("ICP01 ready to work") != TString::npos) { + return; + } + YQL_CLOG(DEBUG, ProviderDq) << message; + } + + void ReopenLog() override { } + }; + + static void InitSelfPingActor(NActors::TActorSystemSetup* setup, NMonitoring::TDynamicCounterPtr rootCounters) + { + const TDuration selfPingInterval = TDuration::MilliSeconds(10); + + const auto counters = rootCounters->GetSubgroup("counters", "utils"); + + for (size_t poolId = 0; poolId < setup->GetExecutorsCount(); ++poolId) { + const auto& poolName = setup->GetPoolName(poolId); + auto poolGroup = counters->GetSubgroup("execpool", poolName); + auto selfPinfMaxCounter = poolGroup->GetCounter("SelfPingMaxUs", false); + auto selfPinfAvgCounter = poolGroup->GetCounter("SelfPingAvgUs", false); + auto selfPinfAvgCounterIn1s = poolGroup->GetCounter("SelfPingAvgUsIn1s", false); + auto cpuTimeCounter = poolGroup->GetCounter("CpuMatBenchNs", false); + IActor* selfPingActor = CreateSelfPingActor(selfPingInterval, selfPinfMaxCounter, + selfPinfAvgCounter, selfPinfAvgCounterIn1s, cpuTimeCounter); + setup->LocalServices.push_back( + std::make_pair(TActorId(), + TActorSetupCmd(selfPingActor, + TMailboxType::HTSwap, + poolId))); + } + } + + std::tuple<THolder<NActors::TActorSystemSetup>, TIntrusivePtr<NActors::NLog::TSettings>> BuildActorSetup( + ui32 nodeId, + TString interconnectAddress, + ui16 port, + SOCKET socket, + TVector<ui32> threads, + NMonitoring::TDynamicCounterPtr counters, + const TNameserverFactory& nameserverFactory, + TMaybe<ui32> maxNodeId, + const NYql::NProto::TDqConfig::TICSettings& icSettings) + { + auto setup = MakeHolder<TActorSystemSetup>(); + + setup->NodeId = nodeId; + + if (threads.empty()) { + threads = {icSettings.GetThreads()}; + } + + setup->ExecutorsCount = threads.size(); + setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); + for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { + setup->Executors[i] = new TBasicExecutorPool( + i, + threads[i], + 50, + "pool-"+ToString(i)// poolName + ); + } + auto schedulerConfig = TSchedulerConfig(); + schedulerConfig.MonCounters = counters; + +#define SET_VALUE(name) \ + if (icSettings.Has ## name()) { \ + schedulerConfig.name = icSettings.Get ## name (); \ + YQL_CLOG(DEBUG, ProviderDq) << "Scheduler IC " << #name << " set to " << schedulerConfig.name; \ + } + + SET_VALUE(ResolutionMicroseconds); + SET_VALUE(SpinThreshold); + SET_VALUE(ProgressThreshold); + SET_VALUE(UseSchedulerActor); + SET_VALUE(RelaxedSendPaceEventsPerSecond); + SET_VALUE(RelaxedSendPaceEventsPerCycle); + SET_VALUE(RelaxedSendThresholdEventsPerSecond); + SET_VALUE(RelaxedSendThresholdEventsPerCycle); + +#undef SET_VALUE + + setup->Scheduler = CreateSchedulerThread(schedulerConfig); + + YQL_CLOG(DEBUG, ProviderDq) << "Initializing local services"; + setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); + if (IActor* schedulerActor = CreateSchedulerActor(schedulerConfig)) { + TActorId schedulerActorId = MakeSchedulerActorId(); + setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0)); + } + + NActors::TActorId loggerActorId(nodeId, "logger"); + auto logSettings = MakeIntrusive<NActors::NLog::TSettings>(loggerActorId, + 0, NActors::NLog::PRI_INFO); + static TString defaultComponent = "ActorLib"; + logSettings->Append(0, 1024, [&](NActors::NLog::EComponent) -> const TString & { return defaultComponent; }); + TString explanation = ""; + if (YQL_CVLOG_ACTIVE(NLog::ELevel::TRACE, NLog::EComponent::CoreDq)) { + logSettings->SetLevel(NActors::NLog::PRI_TRACE, 535 /*NKikimrServices::KQP_COMPUTE*/, explanation); + logSettings->SetLevel(NActors::NLog::PRI_TRACE, 713 /*NKikimrServices::YQL_PROXY*/, explanation); + logSettings->SetLevel(NActors::NLog::PRI_TRACE, 1165 /*NKikimrServices::DQ_TASK_RUNNER*/, explanation); + } + NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor( + logSettings, + new TYqlLogBackend, + counters->GetSubgroup("logger", "counters")); + setup->LocalServices.emplace_back(logSettings->LoggerActorId, TActorSetupCmd(loggerActor, TMailboxType::Simple, 0)); + + TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup(); + THashSet<ui32> staticNodeId; + + YQL_CLOG(DEBUG, ProviderDq) << "Initializing node table"; + nameserverTable->StaticNodeTable[nodeId] = std::make_pair(interconnectAddress, port); + + setup->LocalServices.emplace_back( + MakeDnsResolverActorId(), TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0)); + + setup->LocalServices.emplace_back( + GetNameserviceActorId(), TActorSetupCmd(nameserverFactory(nameserverTable), TMailboxType::ReadAsFilled, 0)); + + + InitSelfPingActor(setup.Get(), counters); + + TIntrusivePtr<TInterconnectProxyCommon> icCommon = new TInterconnectProxyCommon(); + icCommon->NameserviceId = GetNameserviceActorId(); + Y_UNUSED(counters); + //icCommon->MonCounters = counters->GetSubgroup("counters", "interconnect"); + icCommon->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>(); + +#define SET_DURATION(name) \ + { \ + icCommon->Settings.name = TDuration::MilliSeconds(icSettings.Get ## name ## Ms()); \ + YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \ + } + +#define SET_VALUE(name) \ + { \ + icCommon->Settings.name = icSettings.Get ## name(); \ + YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \ + } + + SET_DURATION(Handshake); + SET_DURATION(DeadPeer); + SET_DURATION(CloseOnIdle); + + SET_VALUE(SendBufferDieLimitInMB); + SET_VALUE(TotalInflightAmountOfData); + SET_VALUE(MergePerPeerCounters); + SET_VALUE(MergePerDataCenterCounters); + SET_VALUE(TCPSocketBufferSize); + + SET_DURATION(PingPeriod); + SET_DURATION(ForceConfirmPeriod); + SET_DURATION(LostConnection); + SET_DURATION(BatchPeriod); + + SET_DURATION(MessagePendingTimeout); + + SET_VALUE(MessagePendingSize); + SET_VALUE(MaxSerializedEventSize); + +#undef SET_DURATION +#undef SET_VALUE + + YQL_CLOG(DEBUG, ProviderDq) << "Initializing proxy actors"; + auto effectiveMaxNodeId = maxNodeId.GetOrElse(static_cast<ui32>(ENodeIdLimits::MaxWorkerNodeId)); + setup->Interconnect.ProxyActors.resize(effectiveMaxNodeId + 1); + for (ui32 id = 1; id <= effectiveMaxNodeId; ++id) { + if (nodeId != id) { + IActor* actor = new TInterconnectProxyTCP(id, icCommon); + setup->Interconnect.ProxyActors[id] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0); + } + } + + // start listener + YQL_CLOG(DEBUG, ProviderDq) << "Start listener"; + { + icCommon->TechnicalSelfHostName = interconnectAddress; + YQL_CLOG(INFO, ProviderDq) << "Start listener " << interconnectAddress << ":" << port << " socket: " << socket; + IActor* listener; + TMaybe<SOCKET> maybeSocket = socket < 0 + ? Nothing() + : TMaybe<SOCKET>(socket); + + listener = new NActors::TInterconnectListenerTCP(interconnectAddress, port, icCommon, maybeSocket); + + setup->LocalServices.emplace_back( + MakeInterconnectListenerActorId(false), + TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0)); + } + + YQL_CLOG(DEBUG, ProviderDq) << "Actor initialization complete"; + +#ifdef _unix_ + signal(SIGPIPE, SIG_IGN); +#endif + + return std::make_tuple(std::move(setup), logSettings); + } + + std::tuple<TString, TString> GetLocalAddress(const TString* overrideHostname) { + constexpr auto MaxLocalHostNameLength = 4096; + std::array<char, MaxLocalHostNameLength> buffer; + buffer.fill(0); + TString hostName; + TString localAddress; + + int result = gethostname(buffer.data(), buffer.size() - 1); + if (result != 0) { + Cerr << "gethostname failed for " << std::string_view{buffer.data(), buffer.size()} << " error " << strerror(errno) << Endl; + return std::make_tuple(hostName, localAddress); + } + + if (overrideHostname) { + memcpy(&buffer[0], overrideHostname->c_str(), Min<int>( + overrideHostname->size()+1, buffer.size()-1 + )); + } + + hostName = &buffer[0]; + + addrinfo request; + memset(&request, 0, sizeof(request)); + request.ai_family = AF_INET6; + request.ai_socktype = SOCK_STREAM; + + addrinfo* response = nullptr; + result = getaddrinfo(buffer.data(), nullptr, &request, &response); + if (result != 0) { + Cerr << "getaddrinfo failed for " << std::string_view{buffer.data(), buffer.size()} << " error " << gai_strerror(result) << Endl; + return std::make_tuple(hostName, localAddress); + } + + std::unique_ptr<addrinfo, void (*)(addrinfo*)> holder(response, &freeaddrinfo); + + if (!response->ai_addr) { + Cerr << "getaddrinfo failed: no ai_addr" << Endl; + return std::make_tuple(hostName, localAddress); + } + + auto* sa = response->ai_addr; + Y_VERIFY(sa->sa_family == AF_INET6); + inet_ntop(AF_INET6, &(((struct sockaddr_in6*)sa)->sin6_addr), + &buffer[0], buffer.size() - 1); + + localAddress = &buffer[0]; + + return std::make_tuple(hostName, localAddress); + } + + std::tuple<TString, TString> GetUserToken(const TMaybe<TString>& maybeUser, const TMaybe<TString>& maybeTokenFile) + { + auto home = GetEnv("HOME"); + auto systemUser = GetEnv("USER"); + + TString userName = maybeUser + ? *maybeUser + : systemUser; + + TString tokenFile = maybeTokenFile + ? *maybeTokenFile + : home + "/.yt/token"; + + TString token = TFileInput(tokenFile).ReadLine(); + + return std::make_tuple(userName, token); + } +} diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.h b/ydb/library/yql/providers/dq/service/interconnect_helpers.h new file mode 100644 index 00000000000..9e959cda160 --- /dev/null +++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.h @@ -0,0 +1,51 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/interconnect/poller_tcp.h> +#include <library/cpp/actors/interconnect/interconnect.h> +#include <library/cpp/yson/node/node.h> + +#include <ydb/library/yql/providers/dq/config/config.pb.h> + +namespace NYql::NDqs { + +enum class ENodeIdLimits { + MinServiceNodeId = 1, + MaxServiceNodeId = 512, // excluding + MinWorkerNodeId = 512, + MaxWorkerNodeId = 8192, // excluding +}; + +using TNameserverFactory = std::function<NActors::IActor*(const TIntrusivePtr<NActors::TTableNameserverSetup>& setup)>; + +struct TServiceNodeConfig { + ui32 NodeId; + TString InterconnectAddress; + TString GrpcHostname; + ui16 Port; + ui16 GrpcPort = 8080; + ui16 MbusPort = 0; + SOCKET Socket = -1; + SOCKET GrpcSocket = -1; + TMaybe<ui32> MaxNodeId; + NYql::NProto::TDqConfig::TICSettings ICSettings = NYql::NProto::TDqConfig::TICSettings(); + TNameserverFactory NameserverFactory = [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) { + return CreateNameserverTable(setup); + }; +}; + +std::tuple<TString, TString> GetLocalAddress(const TString* hostname = nullptr); +std::tuple<TString, TString> GetUserToken(const TMaybe<TString>& user, const TMaybe<TString>& tokenFile); + +std::tuple<THolder<NActors::TActorSystemSetup>, TIntrusivePtr<NActors::NLog::TSettings>> BuildActorSetup( + ui32 nodeId, + TString interconnectAddress, + ui16 port, + SOCKET socket, + TVector<ui32> threads, + NMonitoring::TDynamicCounterPtr counters, + const TNameserverFactory& nameserverFactory, + TMaybe<ui32> maxNodeId, + const NYql::NProto::TDqConfig::TICSettings& icSettings = NYql::NProto::TDqConfig::TICSettings()); + +} // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp new file mode 100644 index 00000000000..afc141dc537 --- /dev/null +++ b/ydb/library/yql/providers/dq/service/service_node.cpp @@ -0,0 +1,167 @@ +#include "service_node.h" + +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/providers/common/metrics/metrics_registry.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <ydb/library/yql/providers/dq/actors/execution_helpers.h> + +#include <library/cpp/grpc/server/actors/logger.h> + +#include <utility> + +namespace NYql { + using namespace NActors; + using namespace NGrpc; + using namespace NYql::NDqs; + + class TGrpcExternalListener: public IExternalListener { + public: + TGrpcExternalListener(SOCKET socket) + : Socket(socket) + , Listener(MakeIntrusive<NInterconnect::TStreamSocket>(Socket)) + { + SetNonBlock(socket, true); + } + + void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) override { + Acceptor = std::move(acceptor); + } + + private: + void Start() override { + YQL_CLOG(DEBUG, ProviderDq) << "Start GRPC Listener"; + Poller.Start(); + StartRead(); + } + + void Stop() override { + Poller.Stop(); + } + + void StartRead() { + YQL_CLOG(TRACE, ProviderDq) << "Read next GRPC event"; + Poller.StartRead(Listener, [&](const TIntrusivePtr<TSharedDescriptor>& ss) { + return Accept(ss); + }); + } + + TDelegate Accept(const TIntrusivePtr<TSharedDescriptor>& ss) + { + NInterconnect::TStreamSocket* socket = (NInterconnect::TStreamSocket*)ss.Get(); + int r = 0; + while (r >= 0) { + NInterconnect::TAddress address; + r = socket->Accept(address); + if (r >= 0) { + YQL_CLOG(TRACE, ProviderDq) << "New GRPC connection"; + grpc::experimental::ExternalConnectionAcceptor::NewConnectionParameters params; + SetNonBlock(r, true); + params.listener_fd = -1; // static_cast<int>(Socket); + params.fd = r; + Acceptor->HandleNewConnection(¶ms); + } else if (-r != EAGAIN && -r != EWOULDBLOCK) { + YQL_CLOG(DEBUG, ProviderDq) << "Unknown error code " + ToString(r); + } + } + + return [this] { + StartRead(); + }; + } + + SOCKET Socket; + TIntrusivePtr<NInterconnect::TStreamSocket> Listener; + NInterconnect::TPollerThreads Poller; + std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> Acceptor; + }; + + TServiceNode::TServiceNode( + const TServiceNodeConfig& config, + ui32 threads, + IMetricsRegistryPtr metricsRegistry) + : Config(config) + , Threads(threads) + , MetricsRegistry(std::move(metricsRegistry)) + { + std::tie(Setup, LogSettings) = BuildActorSetup( + Config.NodeId, + Config.InterconnectAddress, + Config.Port, + Config.Socket, + {Threads, 8}, + MetricsRegistry->GetSensors(), + Config.NameserverFactory, + Config.MaxNodeId, + Config.ICSettings); + } + + void TServiceNode::AddLocalService(TActorId actorId, TActorSetupCmd service) { + YQL_ENSURE(!ActorSystem); + Setup->LocalServices.emplace_back(actorId, std::move(service)); + } + + NActors::TActorSystem* TServiceNode::StartActorSystem(void* appData) { + Y_VERIFY(!ActorSystem); + + ActorSystem = MakeHolder<NActors::TActorSystem>(Setup, appData, LogSettings); + ActorSystem->Start(); + + return ActorSystem.Get(); + } + + void TServiceNode::StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) { + class TCustomOption : public grpc::ServerBuilderOption { + public: + TCustomOption() { } + + void UpdateArguments(grpc::ChannelArguments *args) override { + args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 1); + } + + void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override + { } + }; + + YQL_CLOG(INFO, ProviderDq) << "Starting GRPC on " << Config.GrpcPort; + + IExternalListener::TPtr listener = nullptr; + if (Config.GrpcSocket >= 0) { + listener = MakeIntrusive<TGrpcExternalListener>(Config.GrpcSocket); + } + + auto options = TServerOptions() + // .SetHost(CurrentNode->Address) + .SetHost("[::]") + .SetPort(Config.GrpcPort) + .SetExternalListener(listener) + .SetWorkerThreads(2) + .SetGRpcMemoryQuotaBytes(1024 * 1024 * 1024) + .SetMaxMessageSize(1024 * 1024 * 256) + .SetMaxGlobalRequestInFlight(50000) + .SetUseAuth(false) + .SetKeepAliveEnable(true) + .SetKeepAliveIdleTimeoutTriggerSec(360) + .SetKeepAliveMaxProbeCount(3) + .SetKeepAliveProbeIntervalSec(1) + .SetServerBuilderMutator([](grpc::ServerBuilder& builder) { + builder.SetOption(std::make_unique<TCustomOption>()); + }) + .SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER + + Server = MakeHolder<TGRpcServer>(options); + Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories)); + Server->AddService(Service); + Server->Start(); + } + + void TServiceNode::Stop(TDuration timeout) { + (static_cast<TDqsGrpcService*>(Service.Get()))->Stop().Wait(timeout); + + Server->Stop(); + for (auto id : ActorIds) { + ActorSystem->Send(id, new NActors::TEvents::TEvPoison); + } + ActorSystem->Stop(); + } +} // namespace NYql diff --git a/ydb/library/yql/providers/dq/service/service_node.h b/ydb/library/yql/providers/dq/service/service_node.h new file mode 100644 index 00000000000..3f60fb3cff9 --- /dev/null +++ b/ydb/library/yql/providers/dq/service/service_node.h @@ -0,0 +1,48 @@ +#pragma once + +#include "grpc_service.h" +#include "interconnect_helpers.h" + +#include <ydb/library/yql/providers/common/metrics/metrics_registry.h> +#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> + +#include <ydb/library/yql/minikql/mkql_function_registry.h> + +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/actors/interconnect/interconnect.h> +#include <library/cpp/actors/interconnect/interconnect_common.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_server.h> +#include <library/cpp/actors/interconnect/poller_actor.h> + +namespace NYql { + class TServiceNode { + public: + TServiceNode( + const NDqs::TServiceNodeConfig& config, + ui32 threads, + IMetricsRegistryPtr metricsRegistry); + + void AddLocalService(NActors::TActorId actorId, NActors::TActorSetupCmd service); + NActors::TActorSystem* StartActorSystem(void* appData = nullptr); + void StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); + + void Stop(TDuration time = TDuration::Max()); + + NActors::TActorSystemSetup* GetSetup() const { + return Setup.Get(); + } + + private: + NDqs::TServiceNodeConfig Config; + ui32 Threads; + IMetricsRegistryPtr MetricsRegistry; + THolder<NActors::TActorSystemSetup> Setup; + TIntrusivePtr<NActors::NLog::TSettings> LogSettings; + THolder<NActors::TActorSystem> ActorSystem; + TVector<NActors::TActorId> ActorIds; + THolder<NGrpc::TGRpcServer> Server; + TIntrusivePtr<NGrpc::IGRpcService> Service; + }; +} diff --git a/ydb/library/yql/providers/dq/service/ya.make b/ydb/library/yql/providers/dq/service/ya.make new file mode 100644 index 00000000000..5652eaf0f5c --- /dev/null +++ b/ydb/library/yql/providers/dq/service/ya.make @@ -0,0 +1,33 @@ +LIBRARY() + +SRCS( + grpc_service.cpp + grpc_session.cpp + service_node.cpp + interconnect_helpers.cpp +) + +PEERDIR( + library/cpp/actors/core + library/cpp/actors/dnsresolver + library/cpp/actors/interconnect + library/cpp/build_info + library/cpp/grpc/server + library/cpp/grpc/server/actors + library/cpp/svnversion + library/cpp/threading/future + ydb/library/yql/sql + ydb/public/api/protos + ydb/library/yql/providers/common/metrics + ydb/library/yql/providers/dq/actors + ydb/library/yql/providers/dq/api/grpc + ydb/library/yql/providers/dq/common + ydb/library/yql/providers/dq/counters + ydb/library/yql/providers/dq/interface + ydb/library/yql/providers/dq/worker_manager + ydb/library/yql/providers/dq/worker_manager/interface +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/providers/dq/stats_collector/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..233f0fe82bc --- /dev/null +++ b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-stats_collector) +target_compile_options(providers-dq-stats_collector PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-stats_collector PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-helpers + cpp-monlib-dynamic_counters +) +target_sources(providers-dq-stats_collector PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp +) diff --git a/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..bf5c2e8374e --- /dev/null +++ b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-stats_collector) +target_compile_options(providers-dq-stats_collector PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-stats_collector PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-helpers + cpp-monlib-dynamic_counters +) +target_sources(providers-dq-stats_collector PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp +) diff --git a/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..bf5c2e8374e --- /dev/null +++ b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-dq-stats_collector) +target_compile_options(providers-dq-stats_collector PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-dq-stats_collector PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-helpers + cpp-monlib-dynamic_counters +) +target_sources(providers-dq-stats_collector PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp +) diff --git a/ydb/library/yql/providers/dq/stats_collector/CMakeLists.txt b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.txt new file mode 100644 index 00000000000..606ff46b4be --- /dev/null +++ b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp b/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp new file mode 100644 index 00000000000..b9a1303b3be --- /dev/null +++ b/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp @@ -0,0 +1,31 @@ +#include "pool_stats_collector.h" + +#include <library/cpp/actors/helpers/pool_stats_collector.h> + +namespace NYql { + +using namespace NActors; +using namespace NMonitoring; + +namespace { + +TIntrusivePtr<TDynamicCounters> GetServiceCounters(TIntrusivePtr<TDynamicCounters> root, + const TString &service) +{ + auto res = root->GetSubgroup("counters", service); + auto utils = root->GetSubgroup("counters", "utils"); + auto lookupCounter = utils->GetSubgroup("component", service)->GetCounter("CounterLookups", true); + res->SetLookupCounter(lookupCounter); + return res; +} + +} + +IActor *CreateStatsCollector(ui32 intervalSec, + const TActorSystemSetup& setup, + NMonitoring::TDynamicCounterPtr counters) +{ + return new NActors::TStatsCollectingActor(intervalSec, setup, GetServiceCounters(counters, "utils")); +} + +} // namespace NKikimr diff --git a/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h b/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h new file mode 100644 index 00000000000..77c2cab9a60 --- /dev/null +++ b/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h @@ -0,0 +1,18 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NActors { + struct TActorSystemSetup; +} + +// copy from kikimr/core/base/pool_stats_collector.h + +namespace NYql { + + NActors::IActor* CreateStatsCollector(ui32 intervalSec, + const NActors::TActorSystemSetup& setup, + NMonitoring::TDynamicCounterPtr counters); + +} diff --git a/ydb/library/yql/providers/dq/stats_collector/ya.make b/ydb/library/yql/providers/dq/stats_collector/ya.make new file mode 100644 index 00000000000..dcd30c4e90a --- /dev/null +++ b/ydb/library/yql/providers/dq/stats_collector/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SET( + SOURCE + pool_stats_collector.cpp +) + +SRCS( + ${SOURCE} +) + +PEERDIR( + library/cpp/actors/core + library/cpp/actors/helpers + library/cpp/monlib/dynamic_counters +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/providers/yt/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..c762ef5d46c --- /dev/null +++ b/ydb/library/yql/providers/yt/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(codec) +add_subdirectory(common) +add_subdirectory(comp_nodes) +add_subdirectory(dq_task_preprocessor) +add_subdirectory(expr_nodes) +add_subdirectory(gateway) +add_subdirectory(job) +add_subdirectory(lib) +add_subdirectory(mkql_dq) +add_subdirectory(opt) +add_subdirectory(provider) diff --git a/ydb/library/yql/providers/yt/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..c762ef5d46c --- /dev/null +++ b/ydb/library/yql/providers/yt/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(codec) +add_subdirectory(common) +add_subdirectory(comp_nodes) +add_subdirectory(dq_task_preprocessor) +add_subdirectory(expr_nodes) +add_subdirectory(gateway) +add_subdirectory(job) +add_subdirectory(lib) +add_subdirectory(mkql_dq) +add_subdirectory(opt) +add_subdirectory(provider) diff --git a/ydb/library/yql/providers/yt/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..c762ef5d46c --- /dev/null +++ b/ydb/library/yql/providers/yt/CMakeLists.linux-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(codec) +add_subdirectory(common) +add_subdirectory(comp_nodes) +add_subdirectory(dq_task_preprocessor) +add_subdirectory(expr_nodes) +add_subdirectory(gateway) +add_subdirectory(job) +add_subdirectory(lib) +add_subdirectory(mkql_dq) +add_subdirectory(opt) +add_subdirectory(provider) diff --git a/ydb/library/yql/providers/yt/CMakeLists.txt b/ydb/library/yql/providers/yt/CMakeLists.txt index 8e464fc08e9..f8b31df0c11 100644 --- a/ydb/library/yql/providers/yt/CMakeLists.txt +++ b/ydb/library/yql/providers/yt/CMakeLists.txt @@ -6,13 +6,12 @@ # original buildsystem will not be accepted. -add_subdirectory(codec) -add_subdirectory(common) -add_subdirectory(comp_nodes) -add_subdirectory(expr_nodes) -add_subdirectory(gateway) -add_subdirectory(job) -add_subdirectory(lib) -add_subdirectory(mkql_dq) -add_subdirectory(opt) -add_subdirectory(provider) +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/providers/yt/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..8e464fc08e9 --- /dev/null +++ b/ydb/library/yql/providers/yt/CMakeLists.windows-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(codec) +add_subdirectory(common) +add_subdirectory(comp_nodes) +add_subdirectory(expr_nodes) +add_subdirectory(gateway) +add_subdirectory(job) +add_subdirectory(lib) +add_subdirectory(mkql_dq) +add_subdirectory(opt) +add_subdirectory(provider) diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..7d03fe3302f --- /dev/null +++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,34 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-yt-dq_task_preprocessor) +target_compile_options(providers-yt-dq_task_preprocessor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-yt-dq_task_preprocessor PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-yson + cpp-yson-node + cpp-mapreduce-common + cpp-mapreduce-interface + library-yql-utils + yql-utils-log + yql-utils-failure_injector + library-yql-minikql + minikql-computation-llvm + providers-common-codec + providers-dq-interface + providers-yt-codec + yt-gateway-lib + yt-lib-yson_helpers +) +target_sources(providers-yt-dq_task_preprocessor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp +) diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..b03d0481c58 --- /dev/null +++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-aarch64.txt @@ -0,0 +1,35 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-yt-dq_task_preprocessor) +target_compile_options(providers-yt-dq_task_preprocessor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-yt-dq_task_preprocessor PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-yson + cpp-yson-node + cpp-mapreduce-common + cpp-mapreduce-interface + library-yql-utils + yql-utils-log + yql-utils-failure_injector + library-yql-minikql + minikql-computation-llvm + providers-common-codec + providers-dq-interface + providers-yt-codec + yt-gateway-lib + yt-lib-yson_helpers +) +target_sources(providers-yt-dq_task_preprocessor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp +) diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..b03d0481c58 --- /dev/null +++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-x86_64.txt @@ -0,0 +1,35 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-yt-dq_task_preprocessor) +target_compile_options(providers-yt-dq_task_preprocessor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-yt-dq_task_preprocessor PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-yson + cpp-yson-node + cpp-mapreduce-common + cpp-mapreduce-interface + library-yql-utils + yql-utils-log + yql-utils-failure_injector + library-yql-minikql + minikql-computation-llvm + providers-common-codec + providers-dq-interface + providers-yt-codec + yt-gateway-lib + yt-lib-yson_helpers +) +target_sources(providers-yt-dq_task_preprocessor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp +) diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.txt b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.txt new file mode 100644 index 00000000000..606ff46b4be --- /dev/null +++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/ya.make b/ydb/library/yql/providers/yt/dq_task_preprocessor/ya.make new file mode 100644 index 00000000000..2e43b4600b3 --- /dev/null +++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/ya.make @@ -0,0 +1,24 @@ +LIBRARY() + +SRC(yql_yt_dq_task_preprocessor.cpp) + +PEERDIR( + library/cpp/yson + library/cpp/yson/node + yt/cpp/mapreduce/common + yt/cpp/mapreduce/interface + ydb/library/yql/utils + ydb/library/yql/utils/log + ydb/library/yql/utils/failure_injector + ydb/library/yql/minikql + ydb/library/yql/minikql/computation/llvm + ydb/library/yql/providers/common/codec + ydb/library/yql/providers/dq/interface + ydb/library/yql/providers/yt/codec + ydb/library/yql/providers/yt/gateway/lib + ydb/library/yql/providers/yt/lib/yson_helpers +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp b/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp new file mode 100644 index 00000000000..8251e293078 --- /dev/null +++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp @@ -0,0 +1,365 @@ +#include "yql_yt_dq_task_preprocessor.h" + +#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h> +#include <ydb/library/yql/providers/yt/codec/yt_codec.h> +#include <ydb/library/yql/providers/yt/codec/yt_codec_io.h> +#include <ydb/library/yql/providers/yt/lib/yson_helpers/yson_helpers.h> +#include <ydb/library/yql/providers/common/codec/yql_codec.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/failure_injector/failure_injector.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/mkql_alloc.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_mem_info.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <yt/cpp/mapreduce/client/client.h> +#include <yt/cpp/mapreduce/interface/io.h> + +#include <library/cpp/yson/node/node.h> +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yson/node/node_visitor.h> +#include <library/cpp/yson/parser.h> +#include <library/cpp/yson/writer.h> + +#include <util/generic/hash.h> +#include <util/generic/guid.h> +#include <util/generic/yexception.h> +#include <util/generic/ptr.h> +#include <util/system/env.h> +#include <util/stream/str.h> +#include <util/stream/file.h> +#include <util/stream/mem.h> + +#include <string_view> + +namespace NYql::NDq { + +using namespace NKikimr::NMiniKQL; + +namespace { + +class TYtFullResultWriter: public IDqFullResultWriter { +public: + TYtFullResultWriter(IFunctionRegistry::TPtr funcRegistry, NYT::ITransactionPtr tx, const TString& path, const TString& spec) + : FuncRegistry_(std::move(funcRegistry)) + , Alloc(__LOCATION__) + , TypeEnv(Alloc) + , MemInfo("DqFullResultWriter") + , HolderFactory(Alloc.Ref(), MemInfo, FuncRegistry_.Get()) + , CodecContext(TypeEnv, *FuncRegistry_, &HolderFactory) + , Specs() + { + Specs.SetUseSkiff(""); + Specs.Init(CodecContext, spec); + OutStream = tx->CreateRawWriter(NYT::TRichYPath{path}, Specs.MakeOutputFormat(), NYT::TTableWriterOptions()); + TableWriter = MakeHolder<TMkqlWriterImpl>(OutStream, 4_MB); + TableWriter->SetSpecs(Specs); + Alloc.Release(); + } + + ~TYtFullResultWriter() { + try { + Abort(); + } catch (...) { + } + Alloc.Acquire(); + }; + + void AddRow(const NUdf::TUnboxedValuePod& row) override { + YQL_ENSURE(!Finished); + try { + TableWriter->AddRow(row); + ++RowCount; + } catch (const NYT::TErrorResponse& e) { + TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what()); + throw yexception() << errMsg; + } + } + + void Finish() override { + if (!Finished) { + try { + TableWriter->Finish(); + } catch (const NYT::TErrorResponse& e) { + TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what()); + throw yexception() << errMsg; + } + Finished = true; + } + } + + void Abort() override { + if (!Finished) { + try { + TableWriter->Abort(); + OutStream->Abort(); + } catch (const NYT::TErrorResponse& e) { + TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what()); + throw yexception() << errMsg; + } + Finished = true; + } + } + + ui64 GetRowCount() const override { + return RowCount; + } + +private: + const IFunctionRegistry::TPtr FuncRegistry_; + TScopedAlloc Alloc; + TTypeEnvironment TypeEnv; + TMemoryUsageInfo MemInfo; + THolderFactory HolderFactory; + NCommon::TCodecContext CodecContext; + TMkqlIOSpecs Specs; + NYT::TRawTableWriterPtr OutStream; + THolder<TMkqlWriterImpl> TableWriter; + ui64 RowCount{0}; + bool Finished = false; +}; + +class TFileFullResultWriter: public IDqFullResultWriter { +public: + TFileFullResultWriter(IFunctionRegistry::TPtr funcRegistry, const TString& path, const TString& spec, const TString& attrs) + : Path(path) + , Attrs(attrs) + , FuncRegistry_(std::move(funcRegistry)) + , Alloc(__LOCATION__) + , TypeEnv(Alloc) + , MemInfo("DqFullResultWriter") + , HolderFactory(Alloc.Ref(), MemInfo, FuncRegistry_.Get()) + , CodecContext(TypeEnv, *FuncRegistry_, &HolderFactory) + , Specs() + { + Specs.Init(CodecContext, spec); + TableWriter = MakeHolder<TMkqlWriterImpl>(OutStream, 1, 4_MB); + TableWriter->SetSpecs(Specs); + Alloc.Release(); + } + + ~TFileFullResultWriter() { + Alloc.Acquire(); + }; + + void AddRow(const NUdf::TUnboxedValuePod& row) override { + YQL_ENSURE(!Finished); + TableWriter->AddRow(row); + ++RowCount; + } + + void Finish() override { + if (!Finished) { + TableWriter->Finish(); + if (auto binaryYson = OutStream.Str()) { + TMemoryInput in(binaryYson); + TOFStream of(Path); + TDoubleHighPrecisionYsonWriter writer(&of, ::NYson::EYsonType::ListFragment); + NYson::TYsonParser parser(&writer, &in, ::NYson::EYsonType::ListFragment); + parser.Parse(); + } + else { + YQL_ENSURE(TFile(Path, CreateAlways | WrOnly).IsOpen(), "Failed to create " << Path.Quote() << " file"); + } + + { + NYT::TNode attrs = NYT::NodeFromYsonString(Attrs); + TOFStream ofAttr(Path + ".attr"); + NYson::TYsonWriter writer(&ofAttr, NYson::EYsonFormat::Pretty, ::NYson::EYsonType::Node); + NYT::TNodeVisitor visitor(&writer); + visitor.Visit(attrs); + } + + Finished = true; + } + } + + void Abort() override { + Finished = true; + } + + ui64 GetRowCount() const override { + return RowCount; + } + +private: + const TString Path; + const TString Attrs; + const IFunctionRegistry::TPtr FuncRegistry_; + TScopedAlloc Alloc; + TTypeEnvironment TypeEnv; + TMemoryUsageInfo MemInfo; + THolderFactory HolderFactory; + NCommon::TCodecContext CodecContext; + TMkqlIOSpecs Specs; + TStringStream OutStream; + THolder<TMkqlWriterImpl> TableWriter; + ui64 RowCount{0}; + bool Finished = false; +}; + +} // unnamed + +class TYtDqTaskPreprocessor : public IDqTaskPreprocessor { +public: + explicit TYtDqTaskPreprocessor(bool ytEmulationMode, IFunctionRegistry::TPtr funcRegistry) + : YtEmulationMode_(ytEmulationMode) + , FuncRegistry_(std::move(funcRegistry)) + { + } + + TYtDqTaskPreprocessor(TYtDqTaskPreprocessor&&) = default; + TYtDqTaskPreprocessor& operator=(TYtDqTaskPreprocessor&&) = default; + + THashMap<TString, TString> GetTaskParams(const THashMap<TString, TString>& graphParams, const THashMap<TString, TString>& secureParams) final { + YQL_LOG_CTX_SCOPE(TStringBuf("YtDqTaskPreprocessor"), __FUNCTION__); + THashMap<TString, TString> result; + + GraphParams_ = graphParams; + SecureParams_ = secureParams; + if (YtEmulationMode_) { + YQL_LOG(DEBUG) << "No nested transactions are created in YT emulation mode, skipping"; + result["yt.write.tx"] = GetGuidAsString(TGUID()); + } else if (auto p = graphParams.FindPtr("yt.write")) { + try { + auto params = NYT::NodeFromYsonString(*p); + auto rootTxId = params["root_tx"].AsString(); + auto server = params["server"].AsString(); + auto token = params["token"].AsString(); + auto& client = Clients_[std::make_pair(server, token)]; + NYT::TCreateClientOptions opts; + if (token) { + opts.Token(secureParams.Value(token, "")); + } + client = NYT::CreateClient(server, std::move(opts)); + auto rootTx = client->AttachTransaction(GetGuid(rootTxId)); + auto subTx = rootTx->StartTransaction(); + auto subTxId = GetGuidAsString(subTx->GetId()); + TFailureInjector::Reach("expire_tx", [&] { + subTx->Abort(); + subTx = rootTx->StartTransaction(NYT::TStartTransactionOptions().Timeout(TDuration::Seconds(1)).AutoPingable(false)); + subTxId = GetGuidAsString(subTx->GetId()); + ::Sleep(TDuration::Seconds(1)); + }); + + YQL_LOG(DEBUG) << "Cluster " << server << ", creating nested " << subTxId << " for " << rootTxId; + result["yt.write.tx"] = subTxId; + WriteSubTx_ = std::move(subTx); + } + catch (const NYT::TErrorResponse& e) { + TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what()); + throw yexception() << errMsg; + } + } + return result; + } + + void Finish(bool success) override { + GraphParams_.clear(); + SecureParams_.clear(); + try { + if (WriteSubTx_) { + if (success) { + YQL_LOG(DEBUG) << "Committing " << GetGuidAsString(WriteSubTx_->GetId()); + TFailureInjector::Reach("fail_commit", [] { throw yexception() << "fail_commit"; }); + WriteSubTx_->Commit(); + } else { + YQL_LOG(DEBUG) << "Aborting " << GetGuidAsString(WriteSubTx_->GetId()); + WriteSubTx_->Abort(); + } + WriteSubTx_.Reset(); + } + if (FullResultSubTx_) { + if (success) { + YQL_LOG(DEBUG) << "Committing " << GetGuidAsString(FullResultSubTx_->GetId()); + TFailureInjector::Reach("full_result_fail_commit", [] { throw yexception() << "full_result_fail_commit"; }); + FullResultSubTx_->Commit(); + } else { + YQL_LOG(DEBUG) << "Aborting " << GetGuidAsString(FullResultSubTx_->GetId()); + FullResultSubTx_->Abort(); + } + FullResultSubTx_.Reset(); + } + Clients_.clear(); + } catch (const NYT::TErrorResponse& e) { + TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what()); + throw yexception() << errMsg; + } + } + + THolder<IDqFullResultWriter> CreateFullResultWriter() override { + if (auto p = GraphParams_.FindPtr("yt.full_result_table")) { + try { + auto params = NYT::NodeFromYsonString(*p); + if (YtEmulationMode_) { + return MakeHolder<TFileFullResultWriter>(FuncRegistry_, params["path"].AsString(), params["codecSpec"].AsString(), params["tableAttrs"].AsString()); + } else { + auto server = params["server"].AsString(); + auto token = params["token"].AsString(); + auto& client = Clients_[std::make_pair(server, token)]; + if (!client) { + NYT::TCreateClientOptions opts; + if (token) { + opts.Token(SecureParams_.Value(token, "")); + } + client = NYT::CreateClient(server, std::move(opts)); + } + + NYT::ITransactionPtr subTx; + NYT::IClientBasePtr parentTx = client; + if (params.HasKey("root_tx")) { + auto rootTx = client->AttachTransaction(GetGuid(params["root_tx"].AsString())); + subTx = rootTx->StartTransaction(); + if (params.HasKey("external_tx")) { + parentTx = client->AttachTransaction(GetGuid(params["external_tx"].AsString())); + } + } else if (params.HasKey("external_tx")) { + parentTx = client->AttachTransaction(GetGuid(params["external_tx"].AsString())); + subTx = parentTx->StartTransaction(); + } else { + subTx = client->StartTransaction(); + } + + auto path = params["path"].AsString(); + CreateParents({path}, parentTx); + + subTx->Create(path, NYT::NT_TABLE, + NYT::TCreateOptions().Force(true).Attributes(NYT::NodeFromYsonString(params["tableAttrs"].AsString())) + ); + + TFailureInjector::Reach("full_result_fail_create", [&] { throw yexception() << "full_result_fail_create"; }); + + FullResultSubTx_ = subTx; + + return MakeHolder<TYtFullResultWriter>(FuncRegistry_, subTx, path, params["codecSpec"].AsString()); + } + } + catch (const NYT::TErrorResponse& e) { + TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what()); + throw yexception() << errMsg; + } + } + + return {}; + } + +private: + THashMap<TString, TString> GraphParams_; + THashMap<TString, TString> SecureParams_; + THashMap<std::pair<TString, TString>, NYT::IClientPtr> Clients_; + NYT::ITransactionPtr WriteSubTx_; + NYT::ITransactionPtr FullResultSubTx_; + + const bool YtEmulationMode_; + const IFunctionRegistry::TPtr FuncRegistry_; +}; + +TDqTaskPreprocessorFactory CreateYtDqTaskPreprocessorFactory(bool ytEmulationMode, IFunctionRegistry::TPtr funcRegistry) { + return [=]() { + return new TYtDqTaskPreprocessor(ytEmulationMode, funcRegistry); + }; +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h b/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h new file mode 100644 index 00000000000..6939505b89b --- /dev/null +++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h @@ -0,0 +1,10 @@ +#pragma once + +#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> + +namespace NYql::NDq { + +TDqTaskPreprocessorFactory CreateYtDqTaskPreprocessorFactory(bool ytEmulationMode, NKikimr::NMiniKQL::IFunctionRegistry::TPtr funcRegistry); + +} // namespace NYql::NDq diff --git a/ydb/library/yql/tools/CMakeLists.txt b/ydb/library/yql/tools/CMakeLists.txt index 02221978de6..ccad74919ca 100644 --- a/ydb/library/yql/tools/CMakeLists.txt +++ b/ydb/library/yql/tools/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(astdiff) +add_subdirectory(dqrun) add_subdirectory(mrjob) add_subdirectory(sql2yql) add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..5b83f580994 --- /dev/null +++ b/ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,94 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(dqrun) +target_compile_options(dqrun PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(dqrun PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + contrib-libs-protobuf + client-ydb_persqueue_public-codecs + cpp-actors-http + library-cpp-getopt + cpp-lfalloc-alloc_profiler + library-cpp-logger + library-cpp-resource + library-cpp-yson + cpp-mapreduce-interface + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-proto + core-file_storage-http_download + yql-core-services + core-services-mounts + yql-dq-comp_nodes + dq-integration-transform + yql-dq-transform + minikql-comp_nodes-llvm + minikql-invoke_builtins-llvm + providers-clickhouse-actors + providers-clickhouse-provider + providers-common-comp_nodes + providers-common-proto + providers-common-udf_resolve + providers-generic-actors + providers-generic-provider + providers-dq-local_gateway + providers-dq-provider + dq-provider-exec + providers-pq-async_io + pq-gateway-native + providers-pq-provider + providers-s3-actors + providers-s3-provider + providers-solomon-gateway + providers-solomon-provider + providers-ydb-actors + providers-ydb-comp_nodes + providers-ydb-provider + udf-service-terminate_policy + yql-utils-backtrace + yql-utils-bindings + yql-utils-log + yql-core-url_preprocessing + yql-core-url_lister + yt-comp_nodes-dq + providers-yt-dq_task_preprocessor + yt-gateway-file + yt-gateway-native + providers-yt-mkql_dq + providers-yt-provider + yt-lib-yt_download + yt-lib-yt_url_lister + yt-lib-config_clusters + yql-parser-pg_wrapper + utils-log-proto + yql-utils-actor_system + fq-libs-actors + fq-libs-db_id_async_resolver_impl + clickhouse_client_udf +) +target_link_options(dqrun PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(dqrun PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dqrun/dqrun.cpp +) +target_allocator(dqrun + cpp-malloc-jemalloc +) +vcs_info(dqrun) diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..e5251bfa438 --- /dev/null +++ b/ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt @@ -0,0 +1,98 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(dqrun) +target_compile_options(dqrun PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(dqrun PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + client-ydb_persqueue_public-codecs + cpp-actors-http + library-cpp-getopt + cpp-lfalloc-alloc_profiler + library-cpp-logger + library-cpp-resource + library-cpp-yson + cpp-mapreduce-interface + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-proto + core-file_storage-http_download + yql-core-services + core-services-mounts + yql-dq-comp_nodes + dq-integration-transform + yql-dq-transform + minikql-comp_nodes-llvm + minikql-invoke_builtins-llvm + providers-clickhouse-actors + providers-clickhouse-provider + providers-common-comp_nodes + providers-common-proto + providers-common-udf_resolve + providers-generic-actors + providers-generic-provider + providers-dq-local_gateway + providers-dq-provider + dq-provider-exec + providers-pq-async_io + pq-gateway-native + providers-pq-provider + providers-s3-actors + providers-s3-provider + providers-solomon-gateway + providers-solomon-provider + providers-ydb-actors + providers-ydb-comp_nodes + providers-ydb-provider + udf-service-terminate_policy + yql-utils-backtrace + yql-utils-bindings + yql-utils-log + yql-core-url_preprocessing + yql-core-url_lister + yt-comp_nodes-dq + providers-yt-dq_task_preprocessor + yt-gateway-file + yt-gateway-native + providers-yt-mkql_dq + providers-yt-provider + yt-lib-yt_download + yt-lib-yt_url_lister + yt-lib-config_clusters + yql-parser-pg_wrapper + utils-log-proto + yql-utils-actor_system + fq-libs-actors + fq-libs-db_id_async_resolver_impl + clickhouse_client_udf +) +target_link_options(dqrun PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl + -lutil +) +target_sources(dqrun PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dqrun/dqrun.cpp +) +target_allocator(dqrun + cpp-malloc-jemalloc +) +vcs_info(dqrun) diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..9d712a457d5 --- /dev/null +++ b/ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt @@ -0,0 +1,99 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(dqrun) +target_compile_options(dqrun PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(dqrun PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + contrib-libs-protobuf + client-ydb_persqueue_public-codecs + cpp-actors-http + library-cpp-getopt + cpp-lfalloc-alloc_profiler + library-cpp-logger + library-cpp-resource + library-cpp-yson + cpp-mapreduce-interface + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-proto + core-file_storage-http_download + yql-core-services + core-services-mounts + yql-dq-comp_nodes + dq-integration-transform + yql-dq-transform + minikql-comp_nodes-llvm + minikql-invoke_builtins-llvm + providers-clickhouse-actors + providers-clickhouse-provider + providers-common-comp_nodes + providers-common-proto + providers-common-udf_resolve + providers-generic-actors + providers-generic-provider + providers-dq-local_gateway + providers-dq-provider + dq-provider-exec + providers-pq-async_io + pq-gateway-native + providers-pq-provider + providers-s3-actors + providers-s3-provider + providers-solomon-gateway + providers-solomon-provider + providers-ydb-actors + providers-ydb-comp_nodes + providers-ydb-provider + udf-service-terminate_policy + yql-utils-backtrace + yql-utils-bindings + yql-utils-log + yql-core-url_preprocessing + yql-core-url_lister + yt-comp_nodes-dq + providers-yt-dq_task_preprocessor + yt-gateway-file + yt-gateway-native + providers-yt-mkql_dq + providers-yt-provider + yt-lib-yt_download + yt-lib-yt_url_lister + yt-lib-config_clusters + yql-parser-pg_wrapper + utils-log-proto + yql-utils-actor_system + fq-libs-actors + fq-libs-db_id_async_resolver_impl + clickhouse_client_udf +) +target_link_options(dqrun PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl + -lutil +) +target_sources(dqrun PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dqrun/dqrun.cpp +) +target_allocator(dqrun + cpp-malloc-jemalloc +) +vcs_info(dqrun) diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.txt b/ydb/library/yql/tools/dqrun/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/library/yql/tools/dqrun/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/dqrun/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..59bf65b0381 --- /dev/null +++ b/ydb/library/yql/tools/dqrun/CMakeLists.windows-x86_64.txt @@ -0,0 +1,14 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-tools-dqrun INTERFACE) +target_link_libraries(yql-tools-dqrun INTERFACE + contrib-libs-cxxsupp + yutil +) diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp new file mode 100644 index 00000000000..82c121ac385 --- /dev/null +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -0,0 +1,932 @@ +#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h> +#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.h> +#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h> +#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h> +#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h> +#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h> +#include <ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h> +#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h> +#include <ydb/library/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h> +#include <ydb/library/yql/providers/yt/lib/config_clusters/config_clusters.h> +#include <ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h> + +#include <ydb/library/yql/utils/log/proto/logger_config.pb.h> +#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h> +#include <ydb/library/yql/utils/actor_system/manager.h> + +#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h> +#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> +#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h> +#include <ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h> +#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h> +#include <ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h> +#include <ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h> +#include <ydb/library/yql/providers/generic/actors/yql_generic_source_factory.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h> +#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h> +#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h> +#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h> +#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h> +#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h> +#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h> +#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_factory.h> +#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_dq_transform.h> +#include <ydb/library/yql/providers/function/gateway/dq_function_gateway.h> +#include <ydb/library/yql/providers/function/provider/dq_function_provider.h> +#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h> +#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h> +#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h> +#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h> +#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h> +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h> +#include <ydb/library/yql/providers/common/metrics/protos/metrics_registry.pb.h> +#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> +#include <ydb/library/yql/providers/common/udf_resolve/yql_outproc_udf_resolver.h> +#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h> +#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/mkql_utils.h> +#include <ydb/library/yql/protos/yql_mount.pb.h> +#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> +#include <ydb/library/yql/core/file_storage/http_download/http_download.h> +#include <ydb/library/yql/core/file_storage/file_storage.h> +#include <ydb/library/yql/core/facade/yql_facade.h> +#include <ydb/library/yql/core/services/mounts/yql_mounts.h> +#include <ydb/library/yql/core/services/yql_out_transformers.h> +#include <ydb/library/yql/core/url_lister/url_lister_manager.h> +#include <ydb/library/yql/core/yql_library_compiler.h> +#include <ydb/library/yql/utils/log/tls_backend.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/backtrace/backtrace.h> +#include <ydb/library/yql/utils/bindings/utils.h> + +#include <ydb/core/fq/libs/actors/database_resolver.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h> +#include <ydb/core/util/pb.h> + +#include <kikimr/yndx/cm_client/client.h> +#include <yt/cpp/mapreduce/interface/init.h> + +#include <library/cpp/yson/public.h> +#include <library/cpp/yson/writer.h> +#include <library/cpp/getopt/last_getopt.h> +#include <library/cpp/logger/priority.h> +#include <library/cpp/protobuf/util/pb_io.h> +#include <library/cpp/actors/http/http_proxy.h> + +#include <util/generic/string.h> +#include <util/generic/hash.h> +#include <util/generic/scope.h> +#include <util/generic/vector.h> +#include <util/stream/output.h> +#include <util/stream/file.h> +#include <util/system/user.h> +#include <util/system/env.h> +#include <util/system/file.h> +#include <util/string/strip.h> + +#ifdef PROFILE_MEMORY_ALLOCATIONS +#include <library/cpp/lfalloc/alloc_profiler/profiler.h> +#endif + +using namespace NKikimr; +using namespace NYql; + +struct TRunOptions { + bool Sql = false; + TString User; + TMaybe<TString> BindingsFile; + NYson::EYsonFormat ResultsFormat; + bool OptimizeOnly = false; + bool PeepholeOnly = false; + bool TraceOpt = false; + IOutputStream* StatisticsStream = nullptr; + bool PrintPlan = false; + bool AnalyzeQuery = false; + bool AnsiLexer = false; + IOutputStream* ExprOut = nullptr; + IOutputStream* ResultOut = &Cout; + IOutputStream* ErrStream = &Cerr; + IOutputStream* TracePlan = &Cerr; +}; + +class TStoreMappingFunctor: public NLastGetopt::IOptHandler { +public: + TStoreMappingFunctor(THashMap<TString, TString>* target, char delim = '@') + : Target(target) + , Delim(delim) + { + } + + void HandleOpt(const NLastGetopt::TOptsParser* parser) final { + const TStringBuf val(parser->CurValOrDef()); + const auto service = TString(val.After(Delim)); + auto res = Target->emplace(TString(val.Before(Delim)), service); + if (!res.second) { + /// force replace already exist parameter + res.first->second = service; + } + } + +private: + THashMap<TString, TString>* Target; + char Delim; +}; + +void ReadGatewaysConfig(const TString& configFile, TGatewaysConfig* config) { + auto configData = TFileInput(configFile ? configFile : "../../cfg/local/gateways.conf").ReadAll(); + + using ::google::protobuf::TextFormat; + if (!TextFormat::ParseFromString(configData, config)) { + ythrow yexception() << "Bad format of gateways configuration"; + } +} + +TFileStoragePtr CreateFS(const TString& paramsFile, const TString& defYtServer) { + TFileStorageConfig params; + LoadFsConfigFromFile(paramsFile ? paramsFile : "../../cfg/local/fs.conf", params); + return WithAsync(CreateFileStorage(params, {MakeYtDownloader(params, defYtServer)})); +} + +void FillUsedFiles( + const TVector<TString>& filesMappingList, + TUserDataTable& filesMapping) +{ + for (auto& s : filesMappingList) { + TStringBuf fileName, filePath; + TStringBuf(s).Split('@', fileName, filePath); + if (fileName.empty() || filePath.empty()) { + ythrow yexception() << "Incorrect file mapping, expected form " + "name@path, e.g. MyFile@file.txt"; + } + + auto& entry = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + fileName)]; + entry.Type = EUserDataType::PATH; + entry.Data = filePath; + } +} + +bool FillUsedUrls( + const TVector<TString>& urlMappingList, + TUserDataTable& filesMapping) +{ + for (auto& s : urlMappingList) { + TStringBuf name, url; + TStringBuf(s).Split('@', name, url); + if (name.empty() || url.empty()) { + Cerr << "Incorrect url mapping, expected form name@url, " + "e.g. MyUrl@http://example.com/file" << Endl; + return false; + } + + auto& entry = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + name)]; + entry.Type = EUserDataType::URL; + entry.Data = url; + } + return true; +} + +class TOptPipelineConfigurator : public IPipelineConfigurator { +public: + TOptPipelineConfigurator(TProgramPtr prg, bool printPlan, IOutputStream* tracePlan) + : Program(std::move(prg)), PrintPlan(printPlan), TracePlan(tracePlan) + { + } + + void AfterCreate(TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { + pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE), + "OptTrace", TIssuesIds::CORE, "OptTrace"); + } + + void AfterOptimize(TTransformationPipeline* pipeline) const final { + if (PrintPlan) { + pipeline->Add(TPlanOutputTransformer::Sync(TracePlan, Program->GetPlanBuilder(), Program->GetOutputFormat()), "PlanOutput"); + } + } + +private: + TProgramPtr Program; + bool PrintPlan; + IOutputStream* TracePlan; +}; + +NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway, NYql::NConnector::IClient::TPtr genericClient, size_t HTTPmaxTimeSeconds, size_t maxRetriesCount) { + auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); + RegisterDqPqReadActorFactory(*factory, driver, nullptr); + RegisterYdbReadActorFactory(*factory, driver, nullptr); + RegisterS3ReadActorFactory(*factory, nullptr, httpGateway, GetHTTPDefaultRetryPolicy(TDuration::Seconds(HTTPmaxTimeSeconds), maxRetriesCount), {}, nullptr); + RegisterS3WriteActorFactory(*factory, nullptr, httpGateway); + RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway); + RegisterGenericReadActorFactory(*factory, nullptr, genericClient); + + RegisterDqPqWriteActorFactory(*factory, driver, nullptr); + + return factory; +} + +NActors::NLog::EPriority YqlToActorsLogLevel(NYql::NLog::ELevel yqlLevel) { + switch (yqlLevel) { + case NYql::NLog::ELevel::FATAL: + return NActors::NLog::PRI_CRIT; + case NYql::NLog::ELevel::ERROR: + return NActors::NLog::PRI_ERROR; + case NYql::NLog::ELevel::WARN: + return NActors::NLog::PRI_WARN; + case NYql::NLog::ELevel::INFO: + return NActors::NLog::PRI_INFO; + case NYql::NLog::ELevel::DEBUG: + return NActors::NLog::PRI_DEBUG; + case NYql::NLog::ELevel::TRACE: + return NActors::NLog::PRI_TRACE; + default: + ythrow yexception() << "unexpected level: " << int(yqlLevel); + } +} + +struct TActorIds { + NActors::TActorId DatabaseResolver; + NActors::TActorId HttpProxy; +}; + +std::tuple<std::unique_ptr<TActorSystemManager>, TActorIds> RunActorSystem( + const TGatewaysConfig& gatewaysConfig, + IMetricsRegistryPtr& metricsRegistry, + NYql::NLog::ELevel loggingLevel +) { + auto actorSystemManager = std::make_unique<TActorSystemManager>(metricsRegistry, YqlToActorsLogLevel(loggingLevel)); + TActorIds actorIds; + + // Run actor system only if necessary + auto needActorSystem = gatewaysConfig.HasGeneric(); + if (!needActorSystem) { + return std::make_tuple(std::move(actorSystemManager), std::move(actorIds)); + } + + // One can modify actor system setup via actorSystemManager->ApplySetupModifier(). + // TODO: https://st.yandex-team.ru/YQL-16131 + // This will be useful for DQ Gateway initialization refactoring. + actorSystemManager->Start(); + + // Actor system is initialized; start actor registration. + if (gatewaysConfig.HasGeneric()) { + auto httpProxy = NHttp::CreateHttpProxy(); + actorIds.HttpProxy = actorSystemManager->GetActorSystem()->Register(httpProxy); + + auto databaseResolver = NFq::CreateDatabaseResolver(actorIds.HttpProxy, nullptr); + actorIds.DatabaseResolver = actorSystemManager->GetActorSystem()->Register(databaseResolver); + } + + return std::make_tuple(std::move(actorSystemManager), std::move(actorIds)); +} + +int RunProgram(TProgramPtr program, const TRunOptions& options, const THashMap<TString, TString>& clusters) { + bool fail = true; + if (options.Sql) { + Cout << "Parse SQL..." << Endl; + NSQLTranslation::TTranslationSettings sqlSettings; + sqlSettings.ClusterMapping = clusters; + sqlSettings.SyntaxVersion = 1; + sqlSettings.AnsiLexer = options.AnsiLexer; + sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable; + sqlSettings.Flags.insert("DqEngineEnable"); + if (!options.AnalyzeQuery) { + sqlSettings.Flags.insert("DqEngineForce"); + } + + if (options.BindingsFile) { + TFileInput input(*options.BindingsFile); + LoadBindings(sqlSettings.Bindings, input.ReadAll()); + } + + fail = !program->ParseSql(sqlSettings); + } else { + Cout << "Parse YQL..." << Endl; + fail = !program->ParseYql(); + } + program->PrintErrorsTo(*options.ErrStream); + if (fail) { + return 1; + } + program->SetAbortHidden([](){ + Cout << "hidden pseudo-aborted" << Endl; + }); + Cout << "Compile program..." << Endl; + fail = !program->Compile(options.User); + program->PrintErrorsTo(*options.ErrStream); + if (options.TraceOpt) { + program->Print(&Cerr, nullptr); + } + if (fail) { + return 1; + } + + TProgram::TStatus status = TProgram::TStatus::Error; + if (options.OptimizeOnly) { + Cout << "Optimize program..." << Endl; + auto config = TOptPipelineConfigurator(program, options.PrintPlan, options.TracePlan); + status = program->OptimizeWithConfig(options.User, config); + } else { + Cout << "Run program..." << Endl; + auto config = TOptPipelineConfigurator(program, options.PrintPlan, options.TracePlan); + status = program->RunWithConfig(options.User, config); + } + program->PrintErrorsTo(*options.ErrStream); + if (status == TProgram::TStatus::Error) { + if (options.TraceOpt) { + program->Print(&Cerr, nullptr); + } + return 1; + } + program->Print(options.ExprOut, options.TracePlan); + + Cout << "Getting results..." << Endl; + if (program->HasResults()) { + NYson::TYsonWriter yson(options.ResultOut, options.ResultsFormat); + yson.OnBeginList(); + for (const auto& result: program->Results()) { + yson.OnListItem(); + yson.OnRaw(result); + } + yson.OnEndList(); + } + + if (options.StatisticsStream) { + if (auto st = program->GetStatistics(true)) { + TStringInput in(*st); + NYson::ReformatYsonStream(&in, options.StatisticsStream, NYson::EYsonFormat::Pretty); + } + } + + Cout << Endl << "Done" << Endl; + return 0; +} + +int RunMain(int argc, const char* argv[]) +{ + TString gatewaysCfgFile; + TString progFile; + TVector<TString> tablesMappingList; + THashMap<TString, TString> tablesMapping; + TString user = GetUsername(); + TString format; + TVector<TString> filesMappingList; + TVector<TString> urlMappingList; + TString exprFile; + TString resultFile; + TString planFile; + TString errFile; + TString paramsFile; + TString fileStorageCfg; + TVector<TString> udfsPaths; + TString udfsDir; + TMaybe<TString> dqHost; + TMaybe<int> dqPort; + int threads = 16; + TString tmpDir; + const bool hasValidate = false; // todo + THashSet<TString> gatewayTypes; // yqlrun compat, unused + ui16 syntaxVersion; // yqlrun compat, unused + bool emulateOutputForMultirun = false; + THashMap<TString, TString> clusterMapping; + THashSet<TString> sqlFlags; + IMetricsRegistryPtr metricsRegistry = CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq)); + clusterMapping["plato"] = YtProviderName; + + TString mountConfig; + TString mestricsPusherConfig; + TString udfResolver; + bool udfResolverFilterSyscalls = false; + TString statFile; + TString metricsFile; + int verbosity = 3; + bool showLog = false; + bool emulateYt = false; + TString token = GetEnv("YQL_TOKEN"); + if (!token) { + TString home = GetEnv("HOME"); + auto tokenPath = TFsPath(home) / ".yql" / "token"; + if (tokenPath.Exists()) { + token = StripStringRight(TFileInput(tokenPath).ReadAll()); + } + } + THashMap<TString, TString> customTokens; + TString folderId; + + TRunOptions runOptions; + + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddLongOption('p', "program", "Program to execute (use '-' to read from stdin)") + .Required() + .RequiredArgument("FILE") + .StoreResult(&progFile); + opts.AddLongOption('s', "sql", "Program is SQL query") + .Optional() + .NoArgument() + .SetFlag(&runOptions.Sql); + opts.AddLongOption('t', "table", "table@file").AppendTo(&tablesMappingList); + opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping)); + opts.AddLongOption('u', "user", "MR user") + .Optional() + .RequiredArgument("USER") + .StoreResult(&user); + opts.AddLongOption("format", "results format, one of { binary | text | pretty }") + .Optional() + .RequiredArgument("STR") + .DefaultValue("text") + .StoreResult(&format); + opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList); + opts.AddLongOption("url", "name@url").AppendTo(&urlMappingList); + opts.AddLongOption("gateways-cfg", "gateways configuration file") + .Optional() + .RequiredArgument("FILE") + .StoreResult(&gatewaysCfgFile); + opts.AddLongOption("fs-cfg", "Path to file storage config") + .Optional() + .StoreResult(&fileStorageCfg); + opts.AddLongOption("udf", "Load shared library with UDF by given path") + .AppendTo(&udfsPaths); + opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found" + " in given directory") + .StoreResult(&udfsDir); + opts.AddLongOption('O', "optimize", "optimize expression") + .Optional() + .NoArgument() + .SetFlag(&runOptions.OptimizeOnly); + opts.AddLongOption("peephole", "perform peephole optimization") + .Optional() + .NoArgument() + .SetFlag(&runOptions.PeepholeOnly); + opts.AddLongOption("trace-opt", "print AST in the begin of each transformation") + .Optional() + .NoArgument() + .SetFlag(&runOptions.TraceOpt); + opts.AddLongOption("print-expr", "print rebuild AST before execution").NoArgument(); + opts.AddLongOption("expr-file", "print AST to that file instead of stdout").StoreResult<TString>(&exprFile); + opts.AddLongOption("result-file", "print program execution result to file").StoreResult<TString>(&resultFile); + opts.AddLongOption("plan-file", "print program plan to file").StoreResult<TString>(&planFile); + opts.AddLongOption("err-file", "print validate/optimize/runtime errors to file").StoreResult<TString>(&errFile); + opts.AddLongOption("params-file", "Query parameters values in YSON format").StoreResult(¶msFile); + opts.AddLongOption("tmp-dir", "directory for temporary tables").StoreResult<TString>(&tmpDir); + opts.AddLongOption('G', "gateways", "used gateways").SplitHandler(&gatewayTypes, ',').DefaultValue(DqProviderName); + opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&sqlFlags, ','); + opts.AddLongOption("syntax-version", "SQL syntax version").StoreResult(&syntaxVersion).DefaultValue(1); + opts.AddLongOption('m', "mounts", "Mount points config file.").StoreResult(&mountConfig); + opts.AddLongOption('R',"run", "run expression using input/output tables").NoArgument(); // yqlrun compat + opts.AddLongOption('L', "show-log", "show transformation log") + .Optional() + .NoArgument() + .SetFlag(&showLog); + opts.AddLongOption("udf-resolver", "Path to udf-resolver") + .Optional() + .RequiredArgument("PATH") + .StoreResult(&udfResolver); + opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver") + .Optional() + .NoArgument() + .SetFlag(&udfResolverFilterSyscalls); + opts.AddLongOption('v', "verbosity", "Log verbosity level") + .Optional() + .RequiredArgument("LEVEL") + .DefaultValue("6") + .StoreResult(&verbosity); + opts.AddLongOption("token", "YQL token") + .Optional() + .RequiredArgument("VALUE") + .StoreResult(&token); + opts.AddLongOption("custom-tokens", "Custom tokens") + .Optional() + .RequiredArgument("NAME=VALUE or NAME=@PATH") + .KVHandler([&customTokens](TString key, TString value) { + if (value.StartsWith('@')) { + customTokens[key] = StripStringRight(TFileInput(value.substr(1)).ReadAll()); + } else { + customTokens[key] = value; + } + }); + opts.AddLongOption("folderId", "Yandex Cloud folder ID (resolve objects inside this folder)") + .Optional() + .RequiredArgument("VALUE") + .StoreResult(&folderId); + opts.AddLongOption("stat", "Print execution statistics") + .Optional() + .OptionalArgument("FILE") + .StoreResult(&statFile); + opts.AddLongOption("metrics", "Print execution metrics") + .Optional() + .OptionalArgument("FILE") + .StoreResult(&metricsFile); + opts.AddLongOption("print-plan", "Print basic and detailed plan") + .Optional() + .NoArgument() + .SetFlag(&runOptions.PrintPlan); + opts.AddLongOption("keep-temp", "keep temporary tables").NoArgument(); + opts.AddLongOption("analyze-query", "enable analyze query").Optional().NoArgument().SetFlag(&runOptions.AnalyzeQuery); + opts.AddLongOption("ansi-lexer", "Use ansi lexer").Optional().NoArgument().SetFlag(&runOptions.AnsiLexer); + opts.AddLongOption('E', "emulate-yt", "Emulate YT tables").Optional().NoArgument().SetFlag(&emulateYt); + + opts.AddLongOption("dq-host", "Dq Host"); + opts.AddLongOption("dq-port", "Dq Port"); + opts.AddLongOption("threads", "Threads"); + opts.AddLongOption("bindings-file", "Bindings File") + .StoreResult(&runOptions.BindingsFile); + opts.AddLongOption("metrics-pusher-config", "Metrics Pusher Config") + .StoreResult(&mestricsPusherConfig); + opts.AddHelpOption('h'); + + opts.SetFreeArgsNum(0); + + NLastGetopt::TOptsParseResult res(&opts, argc, argv); + + if (runOptions.PeepholeOnly) { + Cerr << "Peephole optimization is not supported yet" << Endl; + return 1; + } + + if (res.Has("dq-host")) { + dqHost = res.Get<TString>("dq-host"); + } + if (res.Has("dq-port")) { + dqPort = res.Get<int>("dq-port"); + } + if (res.Has("threads")) { + threads = res.Get<int>("threads"); + } + + THolder<TFixedBufferFileOutput> exprFileHolder; + if (res.Has("print-expr")) { + runOptions.ExprOut = &Cout; + } else if (!exprFile.empty()) { + exprFileHolder.Reset(new TFixedBufferFileOutput(exprFile)); + runOptions.ExprOut = exprFileHolder.Get(); + } + THolder<TFixedBufferFileOutput> errFileHolder; + if (!errFile.empty()) { + errFileHolder.Reset(new TFixedBufferFileOutput(errFile)); + runOptions.ErrStream = errFileHolder.Get(); + } + THolder<TFixedBufferFileOutput> resultFileHolder; + if (!resultFile.empty()) { + resultFileHolder.Reset(new TFixedBufferFileOutput(resultFile)); + runOptions.ResultOut = resultFileHolder.Get(); + } + THolder<TFixedBufferFileOutput> planFileHolder; + if (!planFile.empty()) { + planFileHolder.Reset(new TFixedBufferFileOutput(planFile)); + runOptions.TracePlan = planFileHolder.Get(); + } + + for (auto& s: tablesMappingList) { + TStringBuf tableName, filePath; + TStringBuf(s).Split('@', tableName, filePath); + if (tableName.empty() || filePath.empty()) { + Cerr << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt" << Endl; + return 1; + } + tablesMapping[tableName] = filePath; + } + + // Reinit logger with new level + NYql::NLog::ELevel loggingLevel = NYql::NLog::ELevelHelpers::FromInt(verbosity); + if (verbosity != LOG_DEF_PRIORITY) { + NYql::NLog::EComponentHelpers::ForEach([loggingLevel](NYql::NLog::EComponent c) { + NYql::NLog::YqlLogger().SetComponentLevel(c, loggingLevel); + }); + } + + if (runOptions.TraceOpt) { + NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE); + NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CoreEval, NYql::NLog::ELevel::TRACE); + NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CorePeepHole, NYql::NLog::ELevel::TRACE); + } else if (showLog) { + NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::DEBUG); + } + + YQL_LOG(INFO) << "dqrun ABI version: " << NUdf::CurrentAbiVersionStr(); + + if (emulateYt && dqPort) { + YQL_LOG(ERROR) << "Remote DQ instance cannot work with the emulated YT cluster"; + return 1; + } + + runOptions.ResultsFormat = + (format == TStringBuf("binary")) ? NYson::EYsonFormat::Binary + : (format == TStringBuf("text")) ? NYson::EYsonFormat::Text + : NYson::EYsonFormat::Pretty; + + runOptions.User = user; + + TUserDataTable dataTable; + FillUsedFiles(filesMappingList, dataTable); + FillUsedUrls(urlMappingList, dataTable); + + NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths); + auto funcRegistry = NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); + NKikimr::NMiniKQL::FillStaticModules(*funcRegistry); + + TGatewaysConfig gatewaysConfig; + ReadGatewaysConfig(gatewaysCfgFile, &gatewaysConfig); + if (runOptions.AnalyzeQuery) { + auto* setting = gatewaysConfig.MutableDq()->AddDefaultSettings(); + setting->SetName("AnalyzeQuery"); + setting->SetValue("1"); + } + + TString defYtServer = gatewaysConfig.HasYt() ? NYql::TConfigClusters::GetDefaultYtServer(gatewaysConfig.GetYt()) : TString(); + auto storage = CreateFS(fileStorageCfg, defYtServer); + + TVector<TIntrusivePtr<TThrRefBase>> gateways; + THashMap<TString, TString> clusters; + TVector<TDataProviderInitializer> dataProvidersInit; + + const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")); + NYdb::TDriver driver(driverConfig); + + Y_DEFER { + driver.Stop(true); + }; + + TVector<NKikimr::NMiniKQL::TComputationNodeFactory> factories = { + GetDqYtFactory(), + GetDqYdbFactory(driver), + GetCommonDqFactory(), + NMiniKQL::GetYqlFactory(), + GetPgFactory() + }; + + if (emulateYt) { + auto ytFileServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, storage, tmpDir, res.Has("keep-temp")); + for (auto& cluster: gatewaysConfig.GetYt().GetClusterMapping()) { + clusters.emplace(to_lower(cluster.GetName()), TString{YtProviderName}); + } + factories.push_back(GetYtFileFactory(ytFileServices)); + clusters["plato"] = YtProviderName; + auto ytNativeGateway = CreateYtFileGateway(ytFileServices, &emulateOutputForMultirun); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway)); + } else if (gatewaysConfig.HasYt()) { + TYtNativeServices ytServices; + ytServices.FunctionRegistry = funcRegistry.Get(); + ytServices.FileStorage = storage; + ytServices.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt()); + auto ytNativeGateway = CreateYtNativeGateway(ytServices); + gateways.emplace_back(ytNativeGateway); + + for (auto& cluster: gatewaysConfig.GetYt().GetClusterMapping()) { + clusters.emplace(to_lower(cluster.GetName()), TString{YtProviderName}); + } + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway)); + } + + auto dqCompFactory = NMiniKQL::GetCompositeWithBuiltinFactory(factories); + + // Actor system starts here and will be automatically destroyed when goes out of the scope. + std::unique_ptr<TActorSystemManager> actorSystemManager; + TActorIds actorIds; + std::tie(actorSystemManager, actorIds) = RunActorSystem(gatewaysConfig, metricsRegistry, loggingLevel); + + IHTTPGateway::TPtr httpGateway; + if (gatewaysConfig.HasClickHouse()) { + for (auto& cluster: gatewaysConfig.GetClickHouse().GetClusterMapping()) { + clusters.emplace(to_lower(cluster.GetName()), TString{ClickHouseProviderName}); + } + if (!httpGateway) { + httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr); + } + dataProvidersInit.push_back(GetClickHouseDataProviderInitializer(httpGateway)); + } + + NConnector::IClient::TPtr genericClient; + if (gatewaysConfig.HasGeneric()) { + for (auto& cluster : *gatewaysConfig.MutableGeneric()->MutableClusterMapping()) { + clusters.emplace(to_lower(cluster.GetName()), TString{GenericProviderName}); + } + + genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector()); + auto dbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( + actorSystemManager->GetActorSystem(), + actorIds.DatabaseResolver, + "", + gatewaysConfig.GetGeneric().GetMdbGateway(), + NFq::MakeMdbEndpointGeneratorGeneric(false) + ); + dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver)); + } + + if (gatewaysConfig.HasYdb()) { + for (auto& cluster: gatewaysConfig.GetYdb().GetClusterMapping()) { + clusters.emplace(to_lower(cluster.GetName()), TString{YdbProviderName}); + } + dataProvidersInit.push_back(GetYdbDataProviderInitializer(driver)); + } + + if (gatewaysConfig.HasS3()) { + for (auto& cluster: gatewaysConfig.GetS3().GetClusterMapping()) { + clusters.emplace(to_lower(cluster.GetName()), TString{S3ProviderName}); + } + if (!httpGateway) { + httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr); + } + dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true)); + } + + if (gatewaysConfig.HasPq()) { + TPqGatewayServices pqServices( + driver, + nullptr, + nullptr, // credentials factory + std::make_shared<TPqGatewayConfig>(gatewaysConfig.GetPq()), + funcRegistry.Get() + ); + auto pqGateway = CreatePqNativeGateway(pqServices); + gateways.emplace_back(pqGateway); + for (auto& cluster: gatewaysConfig.GetPq().GetClusterMapping()) { + clusters.emplace(to_lower(cluster.GetName()), TString{PqProviderName}); + } + dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway)); + } + + if (gatewaysConfig.HasSolomon()) { + auto solomonConfig = gatewaysConfig.GetSolomon(); + auto solomonGateway = CreateSolomonGateway(solomonConfig); + + gateways.emplace_back(solomonGateway); + dataProvidersInit.push_back(NYql::GetSolomonDataProviderInitializer(solomonGateway, false)); + for (const auto& cluster: gatewaysConfig.GetSolomon().GetClusterMapping()) { + clusters.emplace(to_lower(cluster.GetName()), TString{NYql::SolomonProviderName}); + } + } + + std::function<NActors::IActor*(void)> metricsPusherFactory = {}; + + { + TIntrusivePtr<IDqGateway> dqGateway; + if (dqPort) { + dqGateway = CreateDqGateway(dqHost.GetOrElse("localhost"), *dqPort); + } else { + auto dqTaskTransformFactory = CreateCompositeTaskTransformFactory({ + CreateCommonDqTaskTransformFactory(), + CreateYtDqTaskTransformFactory(), + CreateYdbDqTaskTransformFactory() + }); + + TDqTaskPreprocessorFactoryCollection dqTaskPreprocessorFactories = { + NDq::CreateYtDqTaskPreprocessorFactory(emulateYt, funcRegistry) + }; + + size_t requestTimeout = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasRequestTimeoutSeconds() ? gatewaysConfig.GetHttpGateway().GetRequestTimeoutSeconds() : 100; + size_t maxRetries = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasMaxRetries() ? gatewaysConfig.GetHttpGateway().GetMaxRetries() : 2; + + dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, + CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads, + metricsRegistry, + metricsPusherFactory); + } + + gateways.emplace_back(dqGateway); + dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage)); + } + + TExprContext ctx; + IModuleResolver::TPtr moduleResolver; + if (!mountConfig.empty()) { + TModulesTable modules; + NYqlMountConfig::TMountConfig mount; + Y_VERIFY(NKikimr::ParsePBFromFile(mountConfig, &mount)); + FillUserDataTableFromFileSystem(mount, dataTable); + + if (!CompileLibraries(dataTable, ctx, modules)) { + *runOptions.ErrStream << "Errors on compile libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(*runOptions.ErrStream); + return -1; + } + + moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, clusterMapping, sqlFlags, hasValidate); + } else { + if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusters)) { + *runOptions.ErrStream << "Errors loading default YQL libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(*runOptions.ErrStream); + return 1; + } + } + + TExprContext::TFreezeGuard freezeGuard(ctx); + + TProgramFactory progFactory(emulateYt, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "dqrun"); + progFactory.AddUserDataTable(std::move(dataTable)); + progFactory.SetModules(moduleResolver); + if (udfResolver) { + progFactory.SetUdfResolver(NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage, + udfResolver, {}, {}, udfResolverFilterSyscalls, {})); + } else { + progFactory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage, true)); + } + progFactory.SetFileStorage(storage); + progFactory.SetUrlPreprocessing(new TUrlPreprocessing(gatewaysConfig)); + progFactory.SetGatewaysConfig(&gatewaysConfig); + TCredentials::TPtr creds = MakeIntrusive<TCredentials>(); + if (token) { + if (!emulateYt) { + creds->AddCredential("default_yt", TCredential("yt", "", token)); + } + creds->AddCredential("default_ydb", TCredential("ydb", "", token)); + creds->AddCredential("default_pq", TCredential("pq", "", token)); + creds->AddCredential("default_s3", TCredential("s3", "", token)); + creds->AddCredential("default_solomon", TCredential("solomon", "", token)); + creds->AddCredential("default_generic", TCredential("generic", "", token)); + } + if (!customTokens.empty()) { + for (auto& [key, value]: customTokens) { + creds->AddCredential(key, TCredential("custom", "", value)); + } + } + progFactory.SetCredentials(creds); + + progFactory.SetUrlListerManager( + MakeUrlListerManager( + {MakeYtUrlLister()} + ) + ); + + TProgramPtr program; + if (progFile == TStringBuf("-")) { + program = progFactory.Create("-stdin-", Cin.ReadAll()); + } else { + program = progFactory.Create(TFile(progFile, RdOnly)); + program->SetQueryName(progFile); + } + if (paramsFile) { + TString parameters = TFileInput(paramsFile).ReadAll(); + program->SetParametersYson(parameters); + } + + if (!emulateYt) { + program->EnableResultPosition(); + } + + THolder<IOutputStream> statStreamHolder; + if (res.Has("stat")) { + if (statFile) { + statStreamHolder = MakeHolder<TFileOutput>(statFile); + runOptions.StatisticsStream = statStreamHolder.Get(); + } else { + runOptions.StatisticsStream = &Cerr; + } + } + + int result = RunProgram(std::move(program), runOptions, clusters); + if (res.Has("metrics")) { + NProto::TMetricsRegistrySnapshot snapshot; + snapshot.SetDontIncrement(true); + metricsRegistry->TakeSnapshot(&snapshot); + auto output = MakeHolder<TFileOutput>(metricsFile); + SerializeToTextFormat(snapshot, *output.Get()); + } + + return result; +} + +int main(int argc, const char* argv[]) +{ + Y_UNUSED(NUdf::GetStaticSymbols()); + NYql::NBacktrace::RegisterKikimrFatalActions(); + NYql::NBacktrace::EnableKikimrSymbolize(); + + NYT::Initialize(argc, argv); + + // Instead of hardcoding logging level, use CLI args: + // ./dqrun ... -v 6 <- INFO + // ./dqrun ... -v 7 <- DEBUG + // ./dqrun ... -v 8 <- TRACE + auto loggerConfig = NYql::NProto::TLoggingConfig(); + NYql::NLog::InitLogger(loggerConfig, false); + + auto oldBackend = NYql::NLog::YqlLogger().ReleaseBackend(); + NYql::NLog::YqlLogger().ResetBackend(THolder(new NYql::NLog::TTlsLogBackend(oldBackend))); + + //NYql::NLog::YqlLoggerScope logger(&Cerr); + + try { +#ifdef PROFILE_MEMORY_ALLOCATIONS + NAllocProfiler::StartAllocationSampling(true); +#endif + const int res = RunMain(argc, argv); +#ifdef PROFILE_MEMORY_ALLOCATIONS + NAllocProfiler::StopAllocationSampling(Cout); +#endif + return res; + } catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + return 1; + } +} diff --git a/ydb/library/yql/tools/dqrun/ya.make b/ydb/library/yql/tools/dqrun/ya.make new file mode 100644 index 00000000000..c904d4617a9 --- /dev/null +++ b/ydb/library/yql/tools/dqrun/ya.make @@ -0,0 +1,90 @@ +IF (NOT OS_WINDOWS) + PROGRAM() + +IF (PROFILE_MEMORY_ALLOCATIONS) + ALLOCATOR(LF_DBG) + CFLAGS(-DPROFILE_MEMORY_ALLOCATIONS) +ELSE() + ALLOCATOR(J) +ENDIF() + + SRCS( + dqrun.cpp + ) + + PEERDIR( + contrib/libs/protobuf + ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs + library/cpp/actors/http + library/cpp/getopt + library/cpp/lfalloc/alloc_profiler + library/cpp/logger + library/cpp/resource + library/cpp/yson + yt/cpp/mapreduce/interface + ydb/library/yql/sql/pg + ydb/library/yql/core/facade + ydb/library/yql/core/file_storage + ydb/library/yql/core/file_storage/proto + ydb/library/yql/core/file_storage/http_download + ydb/library/yql/core/services + ydb/library/yql/core/services/mounts + ydb/library/yql/dq/comp_nodes + ydb/library/yql/dq/integration/transform + ydb/library/yql/dq/transform + ydb/library/yql/minikql/comp_nodes/llvm + ydb/library/yql/minikql/invoke_builtins/llvm + ydb/library/yql/providers/clickhouse/actors + ydb/library/yql/providers/clickhouse/provider + ydb/library/yql/providers/common/comp_nodes + ydb/library/yql/providers/common/proto + ydb/library/yql/providers/common/udf_resolve + ydb/library/yql/providers/generic/actors + ydb/library/yql/providers/generic/provider + ydb/library/yql/providers/dq/local_gateway + ydb/library/yql/providers/dq/provider + ydb/library/yql/providers/dq/provider/exec + ydb/library/yql/providers/pq/async_io + ydb/library/yql/providers/pq/gateway/native + ydb/library/yql/providers/pq/provider + ydb/library/yql/providers/s3/actors + ydb/library/yql/providers/s3/provider + ydb/library/yql/providers/solomon/gateway + ydb/library/yql/providers/solomon/provider + ydb/library/yql/providers/ydb/actors + ydb/library/yql/providers/ydb/comp_nodes + ydb/library/yql/providers/ydb/provider + + ydb/library/yql/public/udf/service/terminate_policy + ydb/library/yql/utils/backtrace + ydb/library/yql/utils/bindings + ydb/library/yql/utils/log + ydb/library/yql/core/url_preprocessing + ydb/library/yql/core/url_lister + ydb/library/yql/providers/yt/comp_nodes/dq + ydb/library/yql/providers/yt/dq_task_preprocessor + ydb/library/yql/providers/yt/gateway/file + ydb/library/yql/providers/yt/gateway/native + ydb/library/yql/providers/yt/mkql_dq + ydb/library/yql/providers/yt/provider + ydb/library/yql/providers/yt/lib/yt_download + ydb/library/yql/providers/yt/lib/yt_url_lister + ydb/library/yql/providers/yt/lib/config_clusters + ydb/library/yql/parser/pg_wrapper + ydb/library/yql/utils/log/proto + + ydb/library/yql/utils/actor_system + ydb/core/fq/libs/actors + ydb/core/fq/libs/db_id_async_resolver_impl + + ydb/library/yql/udfs/common/clickhouse/client + ) + + YQL_LAST_ABI_VERSION() + + END() +ELSE() + LIBRARY() + + END() +ENDIF() diff --git a/ydb/library/yql/tools/ya.make b/ydb/library/yql/tools/ya.make index a17f84573b4..2841fa69adc 100644 --- a/ydb/library/yql/tools/ya.make +++ b/ydb/library/yql/tools/ya.make @@ -1,6 +1,7 @@ RECURSE( astdiff + dqrun mrjob - yqlrun sql2yql + yqlrun ) |