aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-08-29 13:38:13 +0300
committervvvv <vvvv@ydb.tech>2023-08-29 14:04:53 +0300
commit0c53fce10d3b30ce19c626f8b97be491f17f0dfe (patch)
tree00dec69bd4f07cc6de8b4c6203a032b93f159690
parent0e130abb0980cee82bd511ac6f197a350e22c3a1 (diff)
downloadydb-0c53fce10d3b30ce19c626f8b97be491f17f0dfe.tar.gz
Moved dqrun
-rw-r--r--ydb/library/yql/providers/dq/CMakeLists.darwin-x86_64.txt26
-rw-r--r--ydb/library/yql/providers/dq/CMakeLists.linux-aarch64.txt26
-rw-r--r--ydb/library/yql/providers/dq/CMakeLists.linux-x86_64.txt26
-rw-r--r--ydb/library/yql/providers/dq/CMakeLists.txt24
-rw-r--r--ydb/library/yql/providers/dq/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt28
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt29
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt29
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/CMakeLists.txt15
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/ya.make20
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp266
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h21
-rw-r--r--ydb/library/yql/providers/dq/service/CMakeLists.darwin-x86_64.txt41
-rw-r--r--ydb/library/yql/providers/dq/service/CMakeLists.linux-aarch64.txt42
-rw-r--r--ydb/library/yql/providers/dq/service/CMakeLists.linux-x86_64.txt42
-rw-r--r--ydb/library/yql/providers/dq/service/CMakeLists.txt15
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp839
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.h52
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_session.cpp106
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_session.h57
-rw-r--r--ydb/library/yql/providers/dq/service/interconnect_helpers.cpp302
-rw-r--r--ydb/library/yql/providers/dq/service/interconnect_helpers.h51
-rw-r--r--ydb/library/yql/providers/dq/service/service_node.cpp167
-rw-r--r--ydb/library/yql/providers/dq/service/service_node.h48
-rw-r--r--ydb/library/yql/providers/dq/service/ya.make33
-rw-r--r--ydb/library/yql/providers/dq/stats_collector/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/library/yql/providers/dq/stats_collector/CMakeLists.txt15
-rw-r--r--ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp31
-rw-r--r--ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h18
-rw-r--r--ydb/library/yql/providers/dq/stats_collector/ya.make20
-rw-r--r--ydb/library/yql/providers/yt/CMakeLists.darwin-x86_64.txt19
-rw-r--r--ydb/library/yql/providers/yt/CMakeLists.linux-aarch64.txt19
-rw-r--r--ydb/library/yql/providers/yt/CMakeLists.linux-x86_64.txt19
-rw-r--r--ydb/library/yql/providers/yt/CMakeLists.txt19
-rw-r--r--ydb/library/yql/providers/yt/CMakeLists.windows-x86_64.txt18
-rw-r--r--ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.darwin-x86_64.txt34
-rw-r--r--ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-aarch64.txt35
-rw-r--r--ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-x86_64.txt35
-rw-r--r--ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.txt15
-rw-r--r--ydb/library/yql/providers/yt/dq_task_preprocessor/ya.make24
-rw-r--r--ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp365
-rw-r--r--ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h10
-rw-r--r--ydb/library/yql/tools/CMakeLists.txt1
-rw-r--r--ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt94
-rw-r--r--ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt98
-rw-r--r--ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt99
-rw-r--r--ydb/library/yql/tools/dqrun/CMakeLists.txt17
-rw-r--r--ydb/library/yql/tools/dqrun/CMakeLists.windows-x86_64.txt14
-rw-r--r--ydb/library/yql/tools/dqrun/dqrun.cpp932
-rw-r--r--ydb/library/yql/tools/dqrun/ya.make90
-rw-r--r--ydb/library/yql/tools/ya.make3
53 files changed, 4417 insertions, 26 deletions
diff --git a/ydb/library/yql/providers/dq/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..fae483c5c08
--- /dev/null
+++ b/ydb/library/yql/providers/dq/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(actors)
+add_subdirectory(api)
+add_subdirectory(common)
+add_subdirectory(config)
+add_subdirectory(counters)
+add_subdirectory(expr_nodes)
+add_subdirectory(interface)
+add_subdirectory(local_gateway)
+add_subdirectory(mkql)
+add_subdirectory(opt)
+add_subdirectory(planner)
+add_subdirectory(provider)
+add_subdirectory(runtime)
+add_subdirectory(service)
+add_subdirectory(stats_collector)
+add_subdirectory(task_runner)
+add_subdirectory(task_runner_actor)
+add_subdirectory(worker_manager)
diff --git a/ydb/library/yql/providers/dq/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..fae483c5c08
--- /dev/null
+++ b/ydb/library/yql/providers/dq/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(actors)
+add_subdirectory(api)
+add_subdirectory(common)
+add_subdirectory(config)
+add_subdirectory(counters)
+add_subdirectory(expr_nodes)
+add_subdirectory(interface)
+add_subdirectory(local_gateway)
+add_subdirectory(mkql)
+add_subdirectory(opt)
+add_subdirectory(planner)
+add_subdirectory(provider)
+add_subdirectory(runtime)
+add_subdirectory(service)
+add_subdirectory(stats_collector)
+add_subdirectory(task_runner)
+add_subdirectory(task_runner_actor)
+add_subdirectory(worker_manager)
diff --git a/ydb/library/yql/providers/dq/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..fae483c5c08
--- /dev/null
+++ b/ydb/library/yql/providers/dq/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(actors)
+add_subdirectory(api)
+add_subdirectory(common)
+add_subdirectory(config)
+add_subdirectory(counters)
+add_subdirectory(expr_nodes)
+add_subdirectory(interface)
+add_subdirectory(local_gateway)
+add_subdirectory(mkql)
+add_subdirectory(opt)
+add_subdirectory(planner)
+add_subdirectory(provider)
+add_subdirectory(runtime)
+add_subdirectory(service)
+add_subdirectory(stats_collector)
+add_subdirectory(task_runner)
+add_subdirectory(task_runner_actor)
+add_subdirectory(worker_manager)
diff --git a/ydb/library/yql/providers/dq/CMakeLists.txt b/ydb/library/yql/providers/dq/CMakeLists.txt
index 8a270ca4455..f8b31df0c11 100644
--- a/ydb/library/yql/providers/dq/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/CMakeLists.txt
@@ -6,18 +6,12 @@
# original buildsystem will not be accepted.
-add_subdirectory(actors)
-add_subdirectory(api)
-add_subdirectory(common)
-add_subdirectory(config)
-add_subdirectory(counters)
-add_subdirectory(expr_nodes)
-add_subdirectory(interface)
-add_subdirectory(mkql)
-add_subdirectory(opt)
-add_subdirectory(planner)
-add_subdirectory(provider)
-add_subdirectory(runtime)
-add_subdirectory(task_runner)
-add_subdirectory(task_runner_actor)
-add_subdirectory(worker_manager)
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/dq/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..8a270ca4455
--- /dev/null
+++ b/ydb/library/yql/providers/dq/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(actors)
+add_subdirectory(api)
+add_subdirectory(common)
+add_subdirectory(config)
+add_subdirectory(counters)
+add_subdirectory(expr_nodes)
+add_subdirectory(interface)
+add_subdirectory(mkql)
+add_subdirectory(opt)
+add_subdirectory(planner)
+add_subdirectory(provider)
+add_subdirectory(runtime)
+add_subdirectory(task_runner)
+add_subdirectory(task_runner_actor)
+add_subdirectory(worker_manager)
diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..efe17d868c0
--- /dev/null
+++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-local_gateway)
+target_compile_options(providers-dq-local_gateway PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-local_gateway PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-utils
+ dq-actors-compute
+ providers-dq-provider
+ dq-api-protos
+ providers-dq-task_runner
+ providers-dq-worker_manager
+ providers-dq-service
+ providers-dq-stats_collector
+)
+target_sources(providers-dq-local_gateway PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+)
diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..88a991c531a
--- /dev/null
+++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-local_gateway)
+target_compile_options(providers-dq-local_gateway PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-local_gateway PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-utils
+ dq-actors-compute
+ providers-dq-provider
+ dq-api-protos
+ providers-dq-task_runner
+ providers-dq-worker_manager
+ providers-dq-service
+ providers-dq-stats_collector
+)
+target_sources(providers-dq-local_gateway PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+)
diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..88a991c531a
--- /dev/null
+++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-local_gateway)
+target_compile_options(providers-dq-local_gateway PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-local_gateway PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-utils
+ dq-actors-compute
+ providers-dq-provider
+ dq-api-protos
+ providers-dq-task_runner
+ providers-dq-worker_manager
+ providers-dq-service
+ providers-dq-stats_collector
+)
+target_sources(providers-dq-local_gateway PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+)
diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.txt
new file mode 100644
index 00000000000..606ff46b4be
--- /dev/null
+++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/dq/local_gateway/ya.make b/ydb/library/yql/providers/dq/local_gateway/ya.make
new file mode 100644
index 00000000000..4b3afaeacbd
--- /dev/null
+++ b/ydb/library/yql/providers/dq/local_gateway/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+YQL_LAST_ABI_VERSION()
+
+SRCS(
+ yql_dq_gateway_local.cpp
+)
+
+PEERDIR(
+ ydb/library/yql/utils
+ ydb/library/yql/dq/actors/compute
+ ydb/library/yql/providers/dq/provider
+ ydb/library/yql/providers/dq/api/protos
+ ydb/library/yql/providers/dq/task_runner
+ ydb/library/yql/providers/dq/worker_manager
+ ydb/library/yql/providers/dq/service
+ ydb/library/yql/providers/dq/stats_collector
+)
+
+END()
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
new file mode 100644
index 00000000000..6c01407c04b
--- /dev/null
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -0,0 +1,266 @@
+#include "yql_dq_gateway_local.h"
+
+#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
+#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h>
+
+#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h>
+#include <ydb/library/yql/providers/dq/service/service_node.h>
+
+#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h>
+
+#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h>
+
+#include <ydb/library/yql/utils/range_walker.h>
+#include <ydb/library/yql/utils/bind_in_range.h>
+
+#include <library/cpp/messagebus/network.h>
+
+#include <util/system/env.h>
+#include <util/generic/size_literals.h>
+
+namespace NYql {
+
+using namespace NActors;
+using NDqs::MakeWorkerManagerActorID;
+
+class TLocalServiceHolder {
+public:
+ TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
+ TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
+ NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
+ IMetricsRegistryPtr metricsRegistry,
+ const std::function<IActor*(void)>& metricsPusherFactory)
+ : MetricsRegistry(metricsRegistry
+ ? metricsRegistry
+ : CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq))
+ )
+ {
+ ui32 nodeId = 1;
+
+ TString hostName;
+ TString localAddress;
+ std::tie(hostName, localAddress) = NDqs::GetLocalAddress();
+
+ NDqs::TServiceNodeConfig config = {
+ nodeId,
+ localAddress,
+ hostName,
+ static_cast<ui16>(interconnectPort.Addr.GetPort()),
+ static_cast<ui16>(grpcPort.Addr.GetPort()),
+ 0, // mbus
+ interconnectPort.Socket.Get()->Release(),
+ grpcPort.Socket.Get()->Release(),
+ 1
+ };
+
+ ServiceNode = MakeHolder<TServiceNode>(
+ config,
+ threads,
+ MetricsRegistry);
+
+ auto patternCache = std::make_shared<NKikimr::NMiniKQL::TComputationPatternLRUCache>(200_MB);
+ NDqs::TLocalWorkerManagerOptions lwmOptions;
+ lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, patternCache, true);
+ lwmOptions.AsyncIoFactory = std::move(asyncIoFactory);
+ lwmOptions.FunctionRegistry = functionRegistry;
+ lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
+ lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
+ [=](const NDq::TDqTaskSettings& task, const NDq::TLogFunc& )
+ {
+ return lwmOptions.Factory->Get(task);
+ });
+ lwmOptions.Counters = NDqs::TWorkerManagerCounters(MetricsRegistry->GetSensors()->GetSubgroup("component", "lwm"));
+ lwmOptions.DropTaskCountersOnFinish = false;
+ auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);
+
+ ServiceNode->AddLocalService(
+ MakeWorkerManagerActorID(nodeId),
+ TActorSetupCmd(resman, TMailboxType::Simple, 0));
+
+ auto statsCollector = CreateStatsCollector(1, *ServiceNode->GetSetup(), MetricsRegistry->GetSensors());
+
+ auto actorSystem = ServiceNode->StartActorSystem();
+ if (metricsPusherFactory) {
+ actorSystem->Register(metricsPusherFactory());
+ }
+
+ actorSystem->Register(statsCollector);
+
+ ServiceNode->StartService(dqTaskPreprocessorFactories);
+ }
+
+ ~TLocalServiceHolder()
+ {
+ ServiceNode->Stop();
+ }
+
+private:
+ IMetricsRegistryPtr MetricsRegistry;
+ THolder<TServiceNode> ServiceNode;
+};
+
+class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalImpl>
+{
+ struct TRequest {
+ TString SessionId;
+ NDqs::TPlan Plan;
+ TVector<TString> Columns;
+ THashMap<TString, TString> SecureParams;
+ THashMap<TString, TString> GraphParams;
+ TDqSettings::TPtr Settings;
+ IDqGateway::TDqProgressWriter ProgressWriter;
+ THashMap<TString, TString> ModulesMapping;
+ bool Discard;
+ NThreading::TPromise<IDqGateway::TResult> Result;
+ };
+
+public:
+ TDqGatewayLocalImpl(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway)
+ : LocalService(std::move(localService))
+ , Gateway(gateway)
+ , DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE"))
+ { }
+
+ NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) {
+ return Gateway->OpenSession(sessionId, username);
+ }
+
+ void CloseSession(const TString& sessionId) {
+ return Gateway->CloseSession(sessionId);
+ }
+
+ NThreading::TFuture<IDqGateway::TResult>
+ ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
+ const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
+ const TDqSettings::TPtr& settings,
+ const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
+ bool discard)
+ {
+
+ NThreading::TFuture<IDqGateway::TResult> result;
+ {
+ TGuard<TMutex> lock(Mutex);
+ Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>()});
+ result = Queue.back().Result;
+ }
+
+ TryExecuteNext();
+
+ return result;
+ }
+
+private:
+ void TryExecuteNext() {
+ TGuard<TMutex> lock(Mutex);
+ if (!Queue.empty() && (!DeterministicMode || Inflight == 0)) {
+ auto request = std::move(Queue.front()); Queue.pop_front();
+ Inflight++;
+ lock.Release();
+
+ auto weak = weak_from_this();
+
+ Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard)
+ .Apply([promise=request.Result, weak](const NThreading::TFuture<IDqGateway::TResult>& result) mutable {
+ try {
+ promise.SetValue(result.GetValue());
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ }
+
+ if (auto ptr = weak.lock()) {
+ {
+ TGuard<TMutex> lock(ptr->Mutex);
+ ptr->Inflight--;
+ }
+
+ ptr->TryExecuteNext();
+ }
+ });
+ }
+ }
+
+ THolder<TLocalServiceHolder> LocalService;
+ IDqGateway::TPtr Gateway;
+ const bool DeterministicMode;
+ TMutex Mutex;
+ TList<TRequest> Queue;
+ int Inflight = 0;
+};
+
+class TDqGatewayLocal : public IDqGateway {
+public:
+ TDqGatewayLocal(THolder<TLocalServiceHolder>&& localService, const IDqGateway::TPtr& gateway)
+ : Impl(std::make_shared<TDqGatewayLocalImpl>(std::move(localService), gateway))
+ {}
+
+ NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override {
+ return Impl->OpenSession(sessionId, username);
+ }
+
+ void CloseSession(const TString& sessionId) override {
+ return Impl->CloseSession(sessionId);
+ }
+
+ NThreading::TFuture<TResult>
+ ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
+ const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
+ const TDqSettings::TPtr& settings,
+ const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
+ bool discard) override
+ {
+ return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams,
+ settings, progressWriter, modulesMapping, discard);
+ }
+
+private:
+ std::shared_ptr<TDqGatewayLocalImpl> Impl;
+};
+
+THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
+ TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
+ NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
+ NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
+ IMetricsRegistryPtr metricsRegistry,
+ const std::function<IActor*(void)>& metricsPusherFactory)
+{
+ return MakeHolder<TLocalServiceHolder>(functionRegistry,
+ compFactory,
+ taskTransformFactory,
+ dqTaskPreprocessorFactories,
+ interconnectPort,
+ grpcPort,
+ std::move(asyncIoFactory),
+ threads,
+ metricsRegistry,
+ metricsPusherFactory);
+}
+
+TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
+ TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
+ NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
+ IMetricsRegistryPtr metricsRegistry,
+ const std::function<IActor*(void)>& metricsPusherFactory)
+{
+ int startPort = 31337;
+ TRangeWalker<int> portWalker(startPort, startPort+100);
+ auto interconnectPort = BindInRange(portWalker)[1];
+ auto grpcPort = BindInRange(portWalker)[1];
+
+ return new TDqGatewayLocal(
+ CreateLocalServiceHolder(
+ functionRegistry,
+ compFactory,
+ taskTransformFactory,
+ dqTaskPreprocessorFactories,
+ interconnectPort,
+ grpcPort,
+ std::move(asyncIoFactory),
+ threads,
+ metricsRegistry,
+ metricsPusherFactory),
+ CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort()));
+}
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
new file mode 100644
index 00000000000..b558fbeb7dd
--- /dev/null
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
+#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+#include <ydb/library/yql/providers/common/metrics/metrics_registry.h>
+
+namespace NActors {
+class IActor;
+}
+
+namespace NYql {
+
+TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
+ TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
+ NDq::IDqAsyncIoFactory::TPtr = nullptr, int threads = 16,
+ IMetricsRegistryPtr metricsRegistry = {},
+ const std::function<NActors::IActor*(void)>& metricsPusherFactory = {});
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/service/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/service/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..b1532b4f2eb
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,41 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-service)
+target_compile_options(providers-dq-service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-dnsresolver
+ cpp-actors-interconnect
+ library-cpp-build_info
+ cpp-grpc-server
+ grpc-server-actors
+ library-cpp-svnversion
+ cpp-threading-future
+ library-yql-sql
+ api-protos
+ providers-common-metrics
+ providers-dq-actors
+ dq-api-grpc
+ providers-dq-common
+ providers-dq-counters
+ providers-dq-interface
+ providers-dq-worker_manager
+ dq-worker_manager-interface
+)
+target_sources(providers-dq-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_session.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/service_node.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
+)
diff --git a/ydb/library/yql/providers/dq/service/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/service/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..934db13e595
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,42 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-service)
+target_compile_options(providers-dq-service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-dnsresolver
+ cpp-actors-interconnect
+ library-cpp-build_info
+ cpp-grpc-server
+ grpc-server-actors
+ library-cpp-svnversion
+ cpp-threading-future
+ library-yql-sql
+ api-protos
+ providers-common-metrics
+ providers-dq-actors
+ dq-api-grpc
+ providers-dq-common
+ providers-dq-counters
+ providers-dq-interface
+ providers-dq-worker_manager
+ dq-worker_manager-interface
+)
+target_sources(providers-dq-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_session.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/service_node.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
+)
diff --git a/ydb/library/yql/providers/dq/service/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/service/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..934db13e595
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,42 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-service)
+target_compile_options(providers-dq-service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-dnsresolver
+ cpp-actors-interconnect
+ library-cpp-build_info
+ cpp-grpc-server
+ grpc-server-actors
+ library-cpp-svnversion
+ cpp-threading-future
+ library-yql-sql
+ api-protos
+ providers-common-metrics
+ providers-dq-actors
+ dq-api-grpc
+ providers-dq-common
+ providers-dq-counters
+ providers-dq-interface
+ providers-dq-worker_manager
+ dq-worker_manager-interface
+)
+target_sources(providers-dq-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/grpc_session.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/service_node.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
+)
diff --git a/ydb/library/yql/providers/dq/service/CMakeLists.txt b/ydb/library/yql/providers/dq/service/CMakeLists.txt
new file mode 100644
index 00000000000..606ff46b4be
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
new file mode 100644
index 00000000000..cfe6b53f0b7
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -0,0 +1,839 @@
+#include "grpc_service.h"
+
+#include <ydb/library/yql/utils/log/log.h>
+
+#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
+#include <ydb/library/yql/providers/dq/actors/executer_actor.h>
+#include <ydb/library/yql/providers/dq/worker_manager/interface/events.h>
+#include <ydb/library/yql/providers/dq/actors/execution_helpers.h>
+#include <ydb/library/yql/providers/dq/actors/result_aggregator.h>
+#include <ydb/library/yql/providers/dq/actors/events.h>
+#include <ydb/library/yql/providers/dq/actors/task_controller.h>
+#include <ydb/library/yql/providers/dq/actors/graph_execution_events_actor.h>
+
+#include <ydb/library/yql/providers/dq/counters/counters.h>
+#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
+#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
+
+//#include <yql/tools/yqlworker/dq/worker_manager/benchmark.h>
+
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+
+#include <library/cpp/grpc/server/grpc_counters.h>
+#include <ydb/public/api/protos/ydb_status_codes.pb.h>
+
+#include <library/cpp/actors/interconnect/interconnect.h>
+#include <library/cpp/actors/helpers/future_callback.h>
+#include <library/cpp/build_info/build_info.h>
+#include <library/cpp/svnversion/svnversion.h>
+
+#include <util/string/split.h>
+#include <util/system/env.h>
+
+namespace NYql::NDqs {
+ using namespace NYql::NDqs;
+ using namespace NKikimr;
+ using namespace NThreading;
+ using namespace NMonitoring;
+ using namespace NActors;
+
+ namespace {
+ NGrpc::ICounterBlockPtr BuildCB(TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, const TString& name) {
+ auto grpcCB = counters->GetSubgroup("rpc_name", name);
+ return MakeIntrusive<NGrpc::TCounterBlock>(
+ grpcCB->GetCounter("total", true),
+ grpcCB->GetCounter("infly", true),
+ grpcCB->GetCounter("notOkReq", true),
+ grpcCB->GetCounter("notOkResp", true),
+ grpcCB->GetCounter("reqBytes", true),
+ grpcCB->GetCounter("inflyReqBytes", true),
+ grpcCB->GetCounter("resBytes", true),
+ grpcCB->GetCounter("notAuth", true),
+ grpcCB->GetCounter("resExh", true),
+ grpcCB);
+ }
+
+ template<typename RequestType, typename ResponseType>
+ class TServiceProxyActor: public TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>> {
+ public:
+ static constexpr char ActorName[] = "SERVICE_PROXY";
+ static constexpr char RetryName[] = "OperationRetry";
+
+ explicit TServiceProxyActor(
+ NGrpc::IRequestContextBase* ctx,
+ const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
+ const TString& traceId, const TString& username)
+ : TSynchronizableRichActor<TServiceProxyActor<RequestType, ResponseType>>(&TServiceProxyActor::Handler)
+ , Ctx(ctx)
+ , Counters(counters)
+ , ServiceProxyActorCounters(counters->GetSubgroup("component", "ServiceProxyActor"))
+ , ClientDisconnectedCounter(ServiceProxyActorCounters->GetCounter("ClientDisconnected", /*derivative=*/ true))
+ , RetryCounter(ServiceProxyActorCounters->GetCounter(RetryName, /*derivative=*/ true))
+ , FallbackCounter(ServiceProxyActorCounters->GetCounter("Fallback", /*derivative=*/ true))
+ , ErrorCounter(ServiceProxyActorCounters->GetCounter("UnrecoverableError", /*derivative=*/ true))
+ , Request(dynamic_cast<const RequestType*>(ctx->GetRequest()))
+ , TraceId(traceId)
+ , Username(username)
+ , Promise(NewPromise<void>())
+ {
+ Settings->Dispatch(Request->GetSettings());
+ Settings->FreezeDefaults();
+
+ MaxRetries = Settings->MaxRetries.Get().GetOrElse(MaxRetries);
+ RetryBackoff = TDuration::MilliSeconds(Settings->RetryBackoffMs.Get().GetOrElse(RetryBackoff.MilliSeconds()));
+ }
+
+ STRICT_STFUNC(Handler, {
+ HFunc(TEvQueryResponse, OnReturnResult);
+ cFunc(TEvents::TEvPoison::EventType, OnPoison);
+ SFunc(TEvents::TEvBootstrap, DoBootstrap);
+ hFunc(TEvDqStats, Handle);
+ })
+
+ TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override {
+ return new IEventHandle(self, parentId, new TEvents::TEvBootstrap(), 0);
+ }
+
+ void OnPoison() {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ ;
+ ReplyError(grpc::UNAVAILABLE, "Unexpected error");
+ *ClientDisconnectedCounter += 1;
+ }
+
+ void Handle(TEvDqStats::TPtr&) {
+ // Do nothing
+ }
+
+ void DoPassAway() override {
+ Promise.SetValue();
+ }
+
+ void DoBootstrap(const NActors::TActorContext& ctx) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+ if (!CtxSubscribed) {
+ auto selfId = ctx.SelfID;
+ auto* actorSystem = ctx.ExecutorThread.ActorSystem;
+ Ctx->GetFinishFuture().Subscribe([selfId, actorSystem](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) {
+ Y_VERIFY(future.HasValue());
+ if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL) {
+ actorSystem->Send(selfId, new TEvents::TEvPoison());
+ }
+ });
+ CtxSubscribed = true;
+ }
+ Bootstrap();
+ }
+
+ virtual void Bootstrap() = 0;
+
+ void SendResponse(TEvQueryResponse::TPtr& ev)
+ {
+ auto& result = ev->Get()->Record;
+ Yql::DqsProto::ExecuteQueryResult queryResult;
+ queryResult.Mutableresult()->CopyFrom(result.resultset());
+ queryResult.set_yson(result.yson());
+
+ auto statusCode = result.GetStatusCode();
+ // this code guarantees that query will be considered failed unless the status is SUCCESS
+ // fallback may be performed as an extra measure
+ if (statusCode != NYql::NDqProto::StatusIds::SUCCESS) {
+ YQL_CLOG(ERROR, ProviderDq) << "Query is considered FAILED, status=" << static_cast<int>(statusCode);
+ NYql::TIssue rootIssue("Fatal Error");
+ rootIssue.SetCode(NCommon::NeedFallback(statusCode) ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR);
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(result.GetIssues(), issues);
+ for (const auto& issue: issues) {
+ rootIssue.AddSubIssue(MakeIntrusive<TIssue>(issue));
+ }
+ result.MutableIssues()->Clear();
+ NYql::IssuesToMessage({rootIssue}, result.MutableIssues());
+ }
+
+ for (const auto& [k, v] : QueryStat.Get()) {
+ std::map<TString, TString> labels;
+ TString prefix, name;
+ if (NCommon::ParseCounterName(&prefix, &labels, &name, k)) {
+ if (prefix == "Actor") {
+ auto group = Counters->GetSubgroup("counters", "Actor");
+ for (const auto& [k, v] : labels) {
+ group = group->GetSubgroup(k, v);
+ }
+ group->GetHistogram(name, ExponentialHistogram(10, 2, 50))->Collect(v.Sum);
+ }
+ }
+ }
+
+ if (Settings->AggregateStatsByStage.Get().GetOrElse(TDqSettings::TDefault::AggregateStatsByStage)) {
+ auto aggregatedQueryStat = AggregateQueryStatsByStage(QueryStat, Task2Stage);
+ aggregatedQueryStat.FlushCounters(ResponseBuffer);
+ } else {
+ QueryStat.FlushCounters(ResponseBuffer);
+ }
+
+ auto& operation = *ResponseBuffer.mutable_operation();
+ operation.Setready(true);
+ operation.Mutableresult()->PackFrom(queryResult);
+ *operation.Mutableissues() = result.GetIssues();
+ ResponseBuffer.SetTruncated(result.GetTruncated());
+
+ Reply(Ydb::StatusIds::SUCCESS, statusCode > 1 || result.GetIssues().size() > 0);
+ }
+
+ virtual void DoRetry()
+ {
+ this->CleanupChildren();
+ auto selfId = this->SelfId();
+ TActivationContext::Schedule(RetryBackoff, new IEventHandle(selfId, selfId, new TEvents::TEvBootstrap(), 0));
+ Retry += 1;
+ *RetryCounter +=1 ;
+ }
+
+ void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) {
+ auto& result = ev->Get()->Record;
+ Y_UNUSED(ctx);
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size();
+ QueryStat.AddCounters(result);
+
+ auto statusCode = result.GetStatusCode();
+ if ((statusCode != NYql::NDqProto::StatusIds::SUCCESS || result.GetIssues().size() > 0) && NCommon::IsRetriable(statusCode)) {
+ if (Retry < MaxRetries) {
+ QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0));
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(result.GetIssues(), issues);
+ YQL_CLOG(WARN, ProviderDq) << RetryName << " " << Retry << " Issues: " << issues.ToString();
+ DoRetry();
+ } else {
+ YQL_CLOG(ERROR, ProviderDq) << "Retries limit exceeded, status= " << static_cast<int>(ev->Get()->Record.GetStatusCode());
+ if (statusCode == NYql::NDqProto::StatusIds::SUCCESS) {
+ ev->Get()->Record.SetStatusCode(NYql::NDqProto::StatusIds::INTERNAL_ERROR);
+ }
+ SendResponse(ev);
+ }
+ } else {
+ SendResponse(ev);
+ }
+ }
+
+ TFuture<void> GetFuture() {
+ return Promise.GetFuture();
+ }
+
+ virtual void ReplyError(grpc::StatusCode code, const TString& msg) {
+ Ctx->ReplyError(code, msg);
+ this->PassAway();
+ }
+
+ virtual void Reply(ui32 status, bool hasIssues) {
+ Y_UNUSED(hasIssues);
+ Ctx->Reply(&ResponseBuffer, status);
+ this->PassAway();
+ }
+
+ private:
+ NGrpc::IRequestContextBase* Ctx;
+ bool CtxSubscribed = false;
+ ResponseType ResponseBuffer;
+
+ protected:
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
+ TIntrusivePtr<NMonitoring::TDynamicCounters> ServiceProxyActorCounters;
+ TDynamicCounters::TCounterPtr ClientDisconnectedCounter;
+ TDynamicCounters::TCounterPtr RetryCounter;
+ TDynamicCounters::TCounterPtr FallbackCounter;
+ TDynamicCounters::TCounterPtr ErrorCounter;
+
+ const RequestType* Request;
+ const TString TraceId;
+ const TString Username;
+ TPromise<void> Promise;
+ const TInstant RequestStartTime = TInstant::Now();
+
+ TDqConfiguration::TPtr Settings = MakeIntrusive<TDqConfiguration>();
+
+ int Retry = 0;
+ int MaxRetries = 10;
+ TDuration RetryBackoff = TDuration::MilliSeconds(1000);
+
+ NYql::TCounters QueryStat;
+ THashMap<ui64, ui64> Task2Stage;
+
+ void RestoreRequest() {
+ Request = dynamic_cast<const RequestType*>(Ctx->GetRequest());
+ }
+ };
+
+ class TExecuteGraphProxyActor: public TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse> {
+ public:
+ using TBase = TServiceProxyActor<Yql::DqsProto::ExecuteGraphRequest, Yql::DqsProto::ExecuteGraphResponse>;
+ TExecuteGraphProxyActor(NGrpc::IRequestContextBase* ctx,
+ const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
+ const TString& traceId, const TString& username,
+ const NActors::TActorId& graphExecutionEventsActorId)
+ : TServiceProxyActor(ctx, counters, traceId, username)
+ , GraphExecutionEventsActorId(graphExecutionEventsActorId)
+ {
+ }
+
+ void DoRetry() override {
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
+ SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this](const auto& ev) {
+ if (ev->Get()->Record.GetErrorMessage()) {
+ TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage());
+ } else {
+ RestoreRequest();
+ ModifiedRequest.Reset();
+ TBase::DoRetry();
+ }
+ });
+ }
+
+ void Reply(ui32 status, bool hasIssues) override {
+ auto eventType = hasIssues
+ ? NYql::NDqProto::EGraphExecutionEventType::FAIL
+ : NYql::NDqProto::EGraphExecutionEventType::SUCCESS;
+ SendEvent(eventType, nullptr, [this, status, hasIssues](const auto& ev) {
+ if (!hasIssues && ev->Get()->Record.GetErrorMessage()) {
+ TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage());
+ } else {
+ TBase::Reply(status, hasIssues);
+ }
+ });
+ }
+
+ void ReplyError(grpc::StatusCode code, const TString& msg) override {
+ SendEvent(NYql::NDqProto::EGraphExecutionEventType::FAIL, nullptr, [this, code, msg](const auto& ev) {
+ Y_UNUSED(ev);
+ TBase::ReplyError(code, msg);
+ });
+ }
+
+ private:
+ THolder<Yql::DqsProto::ExecuteGraphRequest> ModifiedRequest;
+
+ void DoPassAway() override {
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
+ Send(GraphExecutionEventsActorId, new TEvents::TEvPoison());
+ TServiceProxyActor::DoPassAway();
+ }
+
+ NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor SerializeGraphDescriptor() const {
+ NDqProto::TGraphExecutionEvent::TExecuteGraphDescriptor result;
+
+ for (const auto& kv : Request->GetSecureParams()) {
+ result.MutableSecureParams()->MutableData()->insert(kv);
+ }
+
+ for (const auto& kv : Request->GetGraphParams()) {
+ result.MutableGraphParams()->MutableData()->insert(kv);
+ }
+
+ return result;
+ }
+
+ void Bootstrap() override {
+ YQL_CLOG(DEBUG, ProviderDq) << "TServiceProxyActor::OnExecuteGraph";
+
+ SendEvent(NYql::NDqProto::EGraphExecutionEventType::START, SerializeGraphDescriptor(), [this](const TEvGraphExecutionEvent::TPtr& ev) {
+ if (ev->Get()->Record.GetErrorMessage()) {
+ TBase::ReplyError(grpc::UNAVAILABLE, ev->Get()->Record.GetErrorMessage());
+ } else {
+ NDqProto::TGraphExecutionEvent::TMap params;
+ ev->Get()->Record.GetMessage().UnpackTo(&params);
+ FinishBootstrap(params);
+ }
+ });
+ }
+
+ void MergeTaskMetas(const NDqProto::TGraphExecutionEvent::TMap& params) {
+ if (!params.data().empty()) {
+ for (size_t i = 0; i < Request->TaskSize(); ++i) {
+ if (!ModifiedRequest) {
+ ModifiedRequest.Reset(new Yql::DqsProto::ExecuteGraphRequest());
+ ModifiedRequest->CopyFrom(*Request);
+ }
+
+ auto* task = ModifiedRequest->MutableTask(i);
+
+ Yql::DqsProto::TTaskMeta taskMeta;
+ task->GetMeta().UnpackTo(&taskMeta);
+
+ for (const auto&[key, value] : params.data()) {
+ (*taskMeta.MutableTaskParams())[key] = value;
+ }
+
+ task->MutableMeta()->PackFrom(taskMeta);
+ }
+ }
+
+ if (ModifiedRequest) {
+ Request = ModifiedRequest.Get();
+ }
+ }
+
+ void FinishBootstrap(const NDqProto::TGraphExecutionEvent::TMap& params) {
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
+ MergeTaskMetas(params);
+
+ auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime));
+
+ TVector<TString> columns;
+ columns.reserve(Request->GetColumns().size());
+ for (const auto& column : Request->GetColumns()) {
+ columns.push_back(column);
+ }
+ for (const auto& task : Request->GetTask()) {
+ Yql::DqsProto::TTaskMeta taskMeta;
+ task.GetMeta().UnpackTo(&taskMeta);
+ Task2Stage[task.GetId()] = taskMeta.GetStageId();
+ }
+ THashMap<TString, TString> secureParams;
+ for (const auto& x : Request->GetSecureParams()) {
+ secureParams[x.first] = x.second;
+ }
+ auto resultId = RegisterChild(NExecutionHelpers::MakeResultAggregator(
+ columns,
+ executerId,
+ TraceId,
+ secureParams,
+ Settings,
+ Request->GetResultType(),
+ Request->GetDiscard(),
+ GraphExecutionEventsActorId).Release());
+ auto controlId = Settings->EnableComputeActor.Get().GetOrElse(false) == false ? resultId
+ : RegisterChild(NYql::MakeTaskController(TraceId, executerId, resultId, Settings, NYql::NCommon::TServiceCounters(Counters, nullptr, "")).Release());
+ Send(executerId, MakeHolder<TEvGraphRequest>(
+ *Request,
+ controlId,
+ resultId));
+ }
+
+ template <class TPayload, class TCallback>
+ void SendEvent(NYql::NDqProto::EGraphExecutionEventType eventType, const TPayload& payload, TCallback callback) {
+ NDqProto::TGraphExecutionEvent record;
+ record.SetEventType(eventType);
+ if constexpr (!std::is_same_v<TPayload, std::nullptr_t>) {
+ record.MutableMessage()->PackFrom(payload);
+ }
+ Send(GraphExecutionEventsActorId, new TEvGraphExecutionEvent(record));
+ Synchronize<TEvGraphExecutionEvent>([callback, traceId = TraceId](TEvGraphExecutionEvent::TPtr& ev) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(traceId);
+ Y_VERIFY(ev->Get()->Record.GetEventType() == NYql::NDqProto::EGraphExecutionEventType::SYNC);
+ callback(ev);
+ });
+ }
+
+ NActors::TActorId GraphExecutionEventsActorId;
+ };
+
+ TString GetVersionString() {
+ TStringBuilder sb;
+ sb << GetProgramSvnVersion() << "\n";
+ sb << GetBuildInfo();
+ TString sandboxTaskId = GetSandboxTaskId();
+ if (sandboxTaskId != TString("0")) {
+ sb << "\nSandbox task id: " << sandboxTaskId;
+ }
+
+ return sb;
+ }
+ }
+
+ TDqsGrpcService::TDqsGrpcService(
+ NActors::TActorSystem& system,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories)
+ : ActorSystem(system)
+ , Counters(std::move(counters))
+ , DqTaskPreprocessorFactories(dqTaskPreprocessorFactories)
+ , Promise(NewPromise<void>())
+ , RunningRequests(0)
+ , Stopping(false)
+ , Sessions(&system, Counters->GetSubgroup("component", "grpc")->GetCounter("Sessions"))
+ { }
+
+#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
+ do { \
+ MakeIntrusive<NGrpc::TGRpcRequest<Yql::DqsProto::IN, Yql::DqsProto::OUT, TDqsGrpcService>>( \
+ this, \
+ &Service_, \
+ CQ, \
+ [this](NGrpc::IRequestContextBase* ctx) { ACTION }, \
+ &Yql::DqsProto::DqService::AsyncService::Request##NAME, \
+ #NAME, \
+ logger, \
+ BuildCB(Counters, #NAME)) \
+ ->Run(); \
+ } while (0)
+
+ void TDqsGrpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
+ using namespace google::protobuf;
+
+ CQ = cq;
+
+ using TDqTaskPreprocessorCollection = std::vector<NYql::IDqTaskPreprocessor::TPtr>;
+
+ ADD_REQUEST(ExecuteGraph, ExecuteGraphRequest, ExecuteGraphResponse, {
+ TGuard<TMutex> lock(Mutex);
+ if (Stopping) {
+ ctx->ReplyError(grpc::UNAVAILABLE, "Terminating in progress");
+ return;
+ }
+ auto* request = dynamic_cast<const Yql::DqsProto::ExecuteGraphRequest*>(ctx->GetRequest());
+ auto session = Sessions.GetSession(request->GetSession());
+ if (!session) {
+ TString message = TStringBuilder()
+ << "Bad session: "
+ << request->GetSession();
+ YQL_CLOG(DEBUG, ProviderDq) << message;
+ ctx->ReplyError(grpc::INVALID_ARGUMENT, message);
+ return;
+ }
+
+ TDqTaskPreprocessorCollection taskPreprocessors;
+ for (const auto& factory: DqTaskPreprocessorFactories) {
+ taskPreprocessors.push_back(factory());
+ }
+
+ auto graphExecutionEventsActorId = ActorSystem.Register(NDqs::MakeGraphExecutionEventsActor(request->GetSession(), std::move(taskPreprocessors)));
+
+ RunningRequests++;
+ auto actor = MakeHolder<TExecuteGraphProxyActor>(ctx, Counters, request->GetSession(), session->GetUsername(), graphExecutionEventsActorId);
+ auto future = actor->GetFuture();
+ auto actorId = ActorSystem.Register(actor.Release());
+ future.Apply([session, actorId, this] (const TFuture<void>&) mutable {
+ RunningRequests--;
+ if (Stopping && !RunningRequests) {
+ Promise.SetValue();
+ }
+
+ session->DeleteRequest(actorId);
+ });
+ session->AddRequest(actorId);
+ });
+
+ ADD_REQUEST(SvnRevision, SvnRevisionRequest, SvnRevisionResponse, {
+ Y_UNUSED(this);
+ Yql::DqsProto::SvnRevisionResponse result;
+ result.SetRevision(GetVersionString());
+ ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
+ });
+
+ ADD_REQUEST(CloseSession, CloseSessionRequest, CloseSessionResponse, {
+ Y_UNUSED(this);
+ auto* request = dynamic_cast<const Yql::DqsProto::CloseSessionRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ Yql::DqsProto::CloseSessionResponse result;
+ Sessions.CloseSession(request->GetSession());
+ ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
+ });
+
+ ADD_REQUEST(PingSession, PingSessionRequest, PingSessionResponse, {
+ Y_UNUSED(this);
+ auto* request = dynamic_cast<const Yql::DqsProto::PingSessionRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ YQL_CLOG(TRACE, ProviderDq) << "PingSession " << request->GetSession();
+
+ Yql::DqsProto::PingSessionResponse result;
+ auto session = Sessions.GetSession(request->GetSession());
+ if (!session) {
+ TString message = TStringBuilder()
+ << "Bad session: "
+ << request->GetSession();
+ YQL_CLOG(DEBUG, ProviderDq) << message;
+ ctx->ReplyError(grpc::INVALID_ARGUMENT, message);
+ } else {
+ ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
+ }
+ });
+
+ ADD_REQUEST(OpenSession, OpenSessionRequest, OpenSessionResponse, {
+ Y_UNUSED(this);
+ auto* request = dynamic_cast<const Yql::DqsProto::OpenSessionRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ YQL_CLOG(DEBUG, ProviderDq) << "OpenSession for " << request->GetSession() << " " << request->GetUsername();
+
+ Yql::DqsProto::OpenSessionResponse result;
+ if (Sessions.OpenSession(request->GetSession(), request->GetUsername())) {
+ ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
+ } else {
+ ctx->ReplyError(grpc::INVALID_ARGUMENT, "Session `" + request->GetSession() + "' exists");
+ }
+ });
+
+ ADD_REQUEST(JobStop, JobStopRequest, JobStopResponse, {
+ auto* request = dynamic_cast<const Yql::DqsProto::JobStopRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ auto ev = MakeHolder<TEvJobStop>(*request);
+
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::JobStopResponse>(ctx->GetArena());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+
+ ActorSystem.Send(MakeWorkerManagerActorID(ActorSystem.NodeId), ev.Release());
+ });
+
+ ADD_REQUEST(ClusterStatus, ClusterStatusRequest, ClusterStatusResponse, {
+ auto ev = MakeHolder<TEvClusterStatus>();
+
+ using ResultEv = TEvClusterStatusResponse;
+
+ TExecutorPoolStats poolStats;
+
+ TExecutorThreadStats stat;
+ TVector<TExecutorThreadStats> stats;
+ ActorSystem.GetPoolStats(0, poolStats, stats);
+
+ for (const auto& s : stats) {
+ stat.Aggregate(s);
+ }
+
+ YQL_CLOG(DEBUG, ProviderDq) << "SentEvents: " << stat.SentEvents;
+ YQL_CLOG(DEBUG, ProviderDq) << "ReceivedEvents: " << stat.ReceivedEvents;
+ YQL_CLOG(DEBUG, ProviderDq) << "NonDeliveredEvents: " << stat.NonDeliveredEvents;
+ YQL_CLOG(DEBUG, ProviderDq) << "EmptyMailboxActivation: " << stat.EmptyMailboxActivation;
+ Sessions.PrintInfo();
+
+ for (ui32 i = 0; i < stat.ActorsAliveByActivity.size(); i=i+1) {
+ if (stat.ActorsAliveByActivity[i]) {
+ YQL_CLOG(DEBUG, ProviderDq) << "ActorsAliveByActivity[" << i << "]=" << stat.ActorsAliveByActivity[i];
+ }
+ }
+
+ auto callback = MakeHolder<TRichActorFutureCallback<ResultEv>>(
+ [ctx] (TAutoPtr<TEventHandle<ResultEv>>& event) mutable {
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::ClusterStatusResponse>(ctx->GetArena());
+ result->MergeFrom(event->Get()->Record.GetResponse());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+ },
+ [ctx] () mutable {
+ YQL_CLOG(INFO, ProviderDq) << "ClusterStatus failed";
+ ctx->ReplyError(grpc::UNAVAILABLE, "Error");
+ },
+ TDuration::MilliSeconds(2000));
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
+ ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
+ });
+
+ ADD_REQUEST(OperationStop, OperationStopRequest, OperationStopResponse, {
+ auto* request = dynamic_cast<const Yql::DqsProto::OperationStopRequest*>(ctx->GetRequest());
+ auto ev = MakeHolder<TEvOperationStop>(*request);
+
+ auto callback = MakeHolder<TRichActorFutureCallback<TEvOperationStopResponse>>(
+ [ctx] (TAutoPtr<TEventHandle<TEvOperationStopResponse>>& event) mutable {
+ Y_UNUSED(event);
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::OperationStopResponse>(ctx->GetArena());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+ },
+ [ctx] () mutable {
+ YQL_CLOG(INFO, ProviderDq) << "OperationStopResponse failed";
+ ctx->ReplyError(grpc::UNAVAILABLE, "Error");
+ },
+ TDuration::MilliSeconds(2000));
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
+ ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
+ });
+
+ ADD_REQUEST(QueryStatus, QueryStatusRequest, QueryStatusResponse, {
+ auto* request = dynamic_cast<const Yql::DqsProto::QueryStatusRequest*>(ctx->GetRequest());
+
+ auto ev = MakeHolder<TEvQueryStatus>(*request);
+
+ auto callback = MakeHolder<TRichActorFutureCallback<TEvQueryStatusResponse>>(
+ [ctx] (TAutoPtr<TEventHandle<TEvQueryStatusResponse>>& event) mutable {
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::QueryStatusResponse>(ctx->GetArena());
+ result->MergeFrom(event->Get()->Record.GetResponse());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+ },
+ [ctx] () mutable {
+ YQL_CLOG(INFO, ProviderDq) << "QueryStatus failed";
+ ctx->ReplyError(grpc::UNAVAILABLE, "Error");
+ },
+ TDuration::MilliSeconds(2000));
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
+ ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
+ });
+
+ ADD_REQUEST(RegisterNode, RegisterNodeRequest, RegisterNodeResponse, {
+ auto* request = dynamic_cast<const Yql::DqsProto::RegisterNodeRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ if (!request->GetPort()
+ || request->GetRole().empty()
+ || request->GetAddress().empty()
+ || request->GetRevision().empty())
+ {
+ ctx->ReplyError(grpc::INVALID_ARGUMENT, "Invalid argument");
+ return;
+ }
+
+ auto ev = MakeHolder<TEvRegisterNode>(*request);
+
+ using ResultEv = TEvRegisterNodeResponse;
+
+ auto callback = MakeHolder<TRichActorFutureCallback<ResultEv>>(
+ [ctx] (TAutoPtr<TEventHandle<ResultEv>>& event) mutable {
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::RegisterNodeResponse>(ctx->GetArena());
+ result->MergeFrom(event->Get()->Record.GetResponse());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+ },
+ [ctx] () mutable {
+ YQL_CLOG(INFO, ProviderDq) << "RegisterNode failed";
+ ctx->ReplyError(grpc::UNAVAILABLE, "Error");
+ },
+ TDuration::MilliSeconds(5000));
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
+ ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
+ });
+
+ ADD_REQUEST(GetMaster, GetMasterRequest, GetMasterResponse, {
+ auto* request = dynamic_cast<const Yql::DqsProto::GetMasterRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ auto requestEvent = MakeHolder<TEvGetMasterRequest>();
+
+ auto callback = MakeHolder<TActorFutureCallback<TEvGetMasterResponse>>(
+ [ctx] (TAutoPtr<TEventHandle<TEvGetMasterResponse>>& event) mutable {
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::GetMasterResponse>(ctx->GetArena());
+ result->MergeFrom(event->Get()->Record.GetResponse());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+ });
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
+ ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release()));
+ });
+
+ ADD_REQUEST(ConfigureFailureInjector, ConfigureFailureInjectorRequest, ConfigureFailureInjectorResponse,{
+ auto* request = dynamic_cast<const Yql::DqsProto::ConfigureFailureInjectorRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ auto requestEvent = MakeHolder<TEvConfigureFailureInjectorRequest>(*request);
+
+ auto callback = MakeHolder<TActorFutureCallback<TEvConfigureFailureInjectorResponse>>(
+ [ctx] (TAutoPtr<TEventHandle<TEvConfigureFailureInjectorResponse>>& event) mutable {
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::ConfigureFailureInjectorResponse>(ctx->GetArena());
+ result->MergeFrom(event->Get()->Record.GetResponse());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+ });
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
+ ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release()));
+ });
+
+ ADD_REQUEST(IsReady, IsReadyRequest, IsReadyResponse, {
+ auto* request = dynamic_cast<const Yql::DqsProto::IsReadyRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ auto ev = MakeHolder<TEvIsReady>(*request);
+
+ auto callback = MakeHolder<TRichActorFutureCallback<TEvIsReadyResponse>>(
+ [ctx] (TAutoPtr<TEventHandle<TEvIsReadyResponse>>& event) mutable {
+ Yql::DqsProto::IsReadyResponse result;
+ result.SetIsReady(event->Get()->Record.GetIsReady());
+ ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
+ },
+ [ctx] () mutable {
+ YQL_CLOG(INFO, ProviderDq) << "IsReadyForRevision failed";
+ ctx->ReplyError(grpc::UNAVAILABLE, "Error");
+ },
+ TDuration::MilliSeconds(2000));
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
+ ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release()));
+ });
+
+ ADD_REQUEST(Routes, RoutesRequest, RoutesResponse, {
+ auto* request = dynamic_cast<const Yql::DqsProto::RoutesRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!request);
+
+ auto ev = MakeHolder<TEvRoutesRequest>();
+
+ auto callback = MakeHolder<TRichActorFutureCallback<TEvRoutesResponse>>(
+ [ctx] (TAutoPtr<TEventHandle<TEvRoutesResponse>>& event) mutable {
+ Yql::DqsProto::RoutesResponse result;
+ result.MergeFrom(event->Get()->Record.GetResponse());
+ ctx->Reply(&result, Ydb::StatusIds::SUCCESS);
+ },
+ [ctx] () mutable {
+ YQL_CLOG(INFO, ProviderDq) << "Routes failed";
+ ctx->ReplyError(grpc::UNAVAILABLE, "Error");
+ },
+ TDuration::MilliSeconds(5000));
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
+ ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(request->GetNodeId()), callbackId, ev.Release()));
+ });
+
+/* 1. move grpc to providers/dq, 2. move benchmark to providers/dq 3. uncomment
+ ADD_REQUEST(Benchmark, BenchmarkRequest, BenchmarkResponse, {
+ auto* req = dynamic_cast<const Yql::DqsProto::BenchmarkRequest*>(ctx->GetRequest());
+ Y_VERIFY(!!req);
+
+ TWorkerManagerBenchmarkOptions options;
+ if (req->GetWorkerCount()) {
+ options.WorkerCount = req->GetWorkerCount();
+ }
+ if (req->GetInflight()) {
+ options.Inflight = req->GetInflight();
+ }
+ if (req->GetTotalRequests()) {
+ options.TotalRequests = req->GetTotalRequests();
+ }
+ if (req->GetMaxRunTimeMs()) {
+ options.MaxRunTimeMs = TDuration::MilliSeconds(req->GetMaxRunTimeMs());
+ }
+
+ auto benchmarkId = ActorSystem.Register(
+ CreateWorkerManagerBenchmark(
+ MakeWorkerManagerActorID(ActorSystem.NodeId), options
+ ));
+
+ ActorSystem.Send(benchmarkId, new TEvents::TEvBootstrap);
+
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::BenchmarkResponse>(ctx->GetArena());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+ });
+*/
+ }
+
+ void TDqsGrpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
+ Limiter = limiter;
+ }
+
+ bool TDqsGrpcService::IncRequest() {
+ return Limiter->Inc();
+ }
+
+ void TDqsGrpcService::DecRequest() {
+ Limiter->Dec();
+ Y_ASSERT(Limiter->GetCurrentInFlight() >= 0);
+ }
+
+ TFuture<void> TDqsGrpcService::Stop() {
+ TGuard<TMutex> lock(Mutex);
+ Stopping = true;
+ auto future = Promise.GetFuture();
+ if (RunningRequests == 0) {
+ Promise.SetValue();
+ }
+ return future;
+ }
+}
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h
new file mode 100644
index 00000000000..fa2a835c8c6
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/grpc_service.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
+
+#include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h>
+#include <ydb/library/yql/providers/dq/api/protos/service.pb.h>
+
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+
+#include <library/cpp/grpc/server/grpc_request.h>
+#include <library/cpp/grpc/server/grpc_server.h>
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/threading/future/future.h>
+
+#include "grpc_session.h"
+
+namespace NYql::NDqs {
+ class TDatabaseManager;
+
+ class TDqsGrpcService: public NGrpc::TGrpcServiceBase<Yql::DqsProto::DqService> {
+ public:
+ TDqsGrpcService(NActors::TActorSystem& system,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
+
+ void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
+ void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+
+ bool IncRequest();
+ void DecRequest();
+
+ NThreading::TFuture<void> Stop();
+
+ private:
+ NActors::TActorSystem& ActorSystem;
+ grpc::ServerCompletionQueue* CQ = nullptr;
+ NGrpc::TGlobalLimiter* Limiter = nullptr;
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
+ TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories;
+ TMutex Mutex;
+ NThreading::TPromise<void> Promise;
+ std::atomic<ui64> RunningRequests;
+ std::atomic<bool> Stopping;
+
+ TSessionStorage Sessions;
+ };
+}
diff --git a/ydb/library/yql/providers/dq/service/grpc_session.cpp b/ydb/library/yql/providers/dq/service/grpc_session.cpp
new file mode 100644
index 00000000000..2de87f9d46a
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/grpc_session.cpp
@@ -0,0 +1,106 @@
+#include "grpc_session.h"
+
+#include <ydb/library/yql/utils/log/log.h>
+
+namespace NYql::NDqs {
+
+TSession::~TSession() {
+ TGuard<TMutex> lock(Mutex);
+ for (auto actorId : Requests) {
+ ActorSystem->Send(actorId, new NActors::TEvents::TEvPoison());
+ }
+}
+
+void TSession::DeleteRequest(const NActors::TActorId& actorId)
+{
+ TGuard<TMutex> lock(Mutex);
+ Requests.erase(actorId);
+}
+
+void TSession::AddRequest(const NActors::TActorId& actorId)
+{
+ TGuard<TMutex> lock(Mutex);
+ Requests.insert(actorId);
+}
+
+TSessionStorage::TSessionStorage(
+ NActors::TActorSystem* actorSystem,
+ const NMonitoring::TDynamicCounters::TCounterPtr& sessionsCounter)
+ : ActorSystem(actorSystem)
+ , SessionsCounter(sessionsCounter)
+{ }
+
+void TSessionStorage::CloseSession(const TString& sessionId)
+{
+ TGuard<TMutex> lock(SessionMutex);
+ auto it = Sessions.find(sessionId);
+ if (it == Sessions.end()) {
+ return;
+ }
+ SessionsByLastUpdate.erase(it->second.Iterator);
+ Sessions.erase(it);
+ *SessionsCounter = Sessions.size();
+}
+
+std::shared_ptr<TSession> TSessionStorage::GetSession(const TString& sessionId)
+{
+ Clean(TInstant::Now() - TDuration::Minutes(10));
+
+ TGuard<TMutex> lock(SessionMutex);
+ auto it = Sessions.find(sessionId);
+ if (it == Sessions.end()) {
+ return std::shared_ptr<TSession>();
+ } else {
+ SessionsByLastUpdate.erase(it->second.Iterator);
+ SessionsByLastUpdate.push_back({TInstant::Now(), sessionId});
+ it->second.Iterator = SessionsByLastUpdate.end();
+ it->second.Iterator--;
+ return it->second.Session;
+ }
+}
+
+bool TSessionStorage::OpenSession(const TString& sessionId, const TString& username)
+{
+ TGuard<TMutex> lock(SessionMutex);
+ if (Sessions.contains(sessionId)) {
+ return false;
+ }
+
+ SessionsByLastUpdate.push_back({TInstant::Now(), sessionId});
+ auto it = SessionsByLastUpdate.end(); --it;
+
+ Sessions[sessionId] = TSessionAndIterator {
+ std::make_shared<TSession>(username, ActorSystem),
+ it
+ };
+
+ *SessionsCounter = Sessions.size();
+
+ return true;
+}
+
+void TSessionStorage::Clean(TInstant before) {
+ TGuard<TMutex> lock(SessionMutex);
+ for (TSessionsByLastUpdate::iterator it = SessionsByLastUpdate.begin();
+ it != SessionsByLastUpdate.end(); )
+ {
+ if (it->LastUpdate < before) {
+ YQL_CLOG(INFO, ProviderDq) << "Drop session by timeout " << it->SessionId;
+ Sessions.erase(it->SessionId);
+ it = SessionsByLastUpdate.erase(it);
+ } else {
+ break;
+ }
+ }
+
+ *SessionsCounter = Sessions.size();
+}
+
+void TSessionStorage::PrintInfo() const {
+ YQL_CLOG(INFO, ProviderDq) << "SessionsByLastUpdate: " << SessionsByLastUpdate.size();
+ YQL_CLOG(DEBUG, ProviderDq) << "Sessions: " << Sessions.size();
+ ui64 currenSessionsCounter = *SessionsCounter;
+ YQL_CLOG(DEBUG, ProviderDq) << "SessionsCounter: " << currenSessionsCounter;
+}
+
+} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/service/grpc_session.h b/ydb/library/yql/providers/dq/service/grpc_session.h
new file mode 100644
index 00000000000..56e592a757d
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/grpc_session.h
@@ -0,0 +1,57 @@
+#pragma once
+#include <library/cpp/actors/core/actorsystem.h>
+
+namespace NYql::NDqs {
+
+class TSession {
+public:
+ TSession(const TString& username, NActors::TActorSystem* actorSystem)
+ : Username(username)
+ , ActorSystem(actorSystem)
+ { }
+
+ ~TSession();
+
+ const TString& GetUsername() const {
+ return Username;
+ }
+
+ void DeleteRequest(const NActors::TActorId& id);
+ void AddRequest(const NActors::TActorId& id);
+
+private:
+ TMutex Mutex;
+ const TString Username;
+ NActors::TActorSystem* ActorSystem;
+ THashSet<NActors::TActorId> Requests;
+};
+
+class TSessionStorage {
+public:
+ TSessionStorage(
+ NActors::TActorSystem* actorSystem,
+ const NMonitoring::TDynamicCounters::TCounterPtr& sessionsCounter);
+ void CloseSession(const TString& sessionId);
+ std::shared_ptr<TSession> GetSession(const TString& sessionId);
+ bool OpenSession(const TString& sessionId, const TString& username);
+ void Clean(TInstant before);
+ void PrintInfo() const;
+
+private:
+ NActors::TActorSystem* ActorSystem;
+ NMonitoring::TDynamicCounters::TCounterPtr SessionsCounter;
+ TMutex SessionMutex;
+ struct TTimeAndSessionId {
+ TInstant LastUpdate;
+ TString SessionId;
+ };
+ using TSessionsByLastUpdate = TList<TTimeAndSessionId>;
+ TSessionsByLastUpdate SessionsByLastUpdate;
+ struct TSessionAndIterator {
+ std::shared_ptr<TSession> Session;
+ TSessionsByLastUpdate::iterator Iterator;
+ };
+ THashMap<TString, TSessionAndIterator> Sessions;
+};
+
+} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
new file mode 100644
index 00000000000..e555d0051dc
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.cpp
@@ -0,0 +1,302 @@
+#include "interconnect_helpers.h"
+#include "service_node.h"
+
+#include "grpc_service.h"
+
+#include <library/cpp/actors/helpers/selfping_actor.h>
+
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/actors/core/scheduler_actor.h>
+#include <library/cpp/actors/dnsresolver/dnsresolver.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect_common.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
+#include <library/cpp/actors/interconnect/poller_actor.h>
+#include <library/cpp/yson/node/node_io.h>
+
+#include <util/stream/file.h>
+#include <util/system/env.h>
+
+namespace NYql::NDqs {
+ using namespace NActors;
+ using namespace NActors::NDnsResolver;
+ using namespace NGrpc;
+
+ class TYqlLogBackend: public TLogBackend {
+ void WriteData(const TLogRecord& rec) override {
+ TString message(rec.Data, rec.Len);
+ if (message.find("ICP01 ready to work") != TString::npos) {
+ return;
+ }
+ YQL_CLOG(DEBUG, ProviderDq) << message;
+ }
+
+ void ReopenLog() override { }
+ };
+
+ static void InitSelfPingActor(NActors::TActorSystemSetup* setup, NMonitoring::TDynamicCounterPtr rootCounters)
+ {
+ const TDuration selfPingInterval = TDuration::MilliSeconds(10);
+
+ const auto counters = rootCounters->GetSubgroup("counters", "utils");
+
+ for (size_t poolId = 0; poolId < setup->GetExecutorsCount(); ++poolId) {
+ const auto& poolName = setup->GetPoolName(poolId);
+ auto poolGroup = counters->GetSubgroup("execpool", poolName);
+ auto selfPinfMaxCounter = poolGroup->GetCounter("SelfPingMaxUs", false);
+ auto selfPinfAvgCounter = poolGroup->GetCounter("SelfPingAvgUs", false);
+ auto selfPinfAvgCounterIn1s = poolGroup->GetCounter("SelfPingAvgUsIn1s", false);
+ auto cpuTimeCounter = poolGroup->GetCounter("CpuMatBenchNs", false);
+ IActor* selfPingActor = CreateSelfPingActor(selfPingInterval, selfPinfMaxCounter,
+ selfPinfAvgCounter, selfPinfAvgCounterIn1s, cpuTimeCounter);
+ setup->LocalServices.push_back(
+ std::make_pair(TActorId(),
+ TActorSetupCmd(selfPingActor,
+ TMailboxType::HTSwap,
+ poolId)));
+ }
+ }
+
+ std::tuple<THolder<NActors::TActorSystemSetup>, TIntrusivePtr<NActors::NLog::TSettings>> BuildActorSetup(
+ ui32 nodeId,
+ TString interconnectAddress,
+ ui16 port,
+ SOCKET socket,
+ TVector<ui32> threads,
+ NMonitoring::TDynamicCounterPtr counters,
+ const TNameserverFactory& nameserverFactory,
+ TMaybe<ui32> maxNodeId,
+ const NYql::NProto::TDqConfig::TICSettings& icSettings)
+ {
+ auto setup = MakeHolder<TActorSystemSetup>();
+
+ setup->NodeId = nodeId;
+
+ if (threads.empty()) {
+ threads = {icSettings.GetThreads()};
+ }
+
+ setup->ExecutorsCount = threads.size();
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
+ for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
+ setup->Executors[i] = new TBasicExecutorPool(
+ i,
+ threads[i],
+ 50,
+ "pool-"+ToString(i)// poolName
+ );
+ }
+ auto schedulerConfig = TSchedulerConfig();
+ schedulerConfig.MonCounters = counters;
+
+#define SET_VALUE(name) \
+ if (icSettings.Has ## name()) { \
+ schedulerConfig.name = icSettings.Get ## name (); \
+ YQL_CLOG(DEBUG, ProviderDq) << "Scheduler IC " << #name << " set to " << schedulerConfig.name; \
+ }
+
+ SET_VALUE(ResolutionMicroseconds);
+ SET_VALUE(SpinThreshold);
+ SET_VALUE(ProgressThreshold);
+ SET_VALUE(UseSchedulerActor);
+ SET_VALUE(RelaxedSendPaceEventsPerSecond);
+ SET_VALUE(RelaxedSendPaceEventsPerCycle);
+ SET_VALUE(RelaxedSendThresholdEventsPerSecond);
+ SET_VALUE(RelaxedSendThresholdEventsPerCycle);
+
+#undef SET_VALUE
+
+ setup->Scheduler = CreateSchedulerThread(schedulerConfig);
+
+ YQL_CLOG(DEBUG, ProviderDq) << "Initializing local services";
+ setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0));
+ if (IActor* schedulerActor = CreateSchedulerActor(schedulerConfig)) {
+ TActorId schedulerActorId = MakeSchedulerActorId();
+ setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0));
+ }
+
+ NActors::TActorId loggerActorId(nodeId, "logger");
+ auto logSettings = MakeIntrusive<NActors::NLog::TSettings>(loggerActorId,
+ 0, NActors::NLog::PRI_INFO);
+ static TString defaultComponent = "ActorLib";
+ logSettings->Append(0, 1024, [&](NActors::NLog::EComponent) -> const TString & { return defaultComponent; });
+ TString explanation = "";
+ if (YQL_CVLOG_ACTIVE(NLog::ELevel::TRACE, NLog::EComponent::CoreDq)) {
+ logSettings->SetLevel(NActors::NLog::PRI_TRACE, 535 /*NKikimrServices::KQP_COMPUTE*/, explanation);
+ logSettings->SetLevel(NActors::NLog::PRI_TRACE, 713 /*NKikimrServices::YQL_PROXY*/, explanation);
+ logSettings->SetLevel(NActors::NLog::PRI_TRACE, 1165 /*NKikimrServices::DQ_TASK_RUNNER*/, explanation);
+ }
+ NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(
+ logSettings,
+ new TYqlLogBackend,
+ counters->GetSubgroup("logger", "counters"));
+ setup->LocalServices.emplace_back(logSettings->LoggerActorId, TActorSetupCmd(loggerActor, TMailboxType::Simple, 0));
+
+ TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup();
+ THashSet<ui32> staticNodeId;
+
+ YQL_CLOG(DEBUG, ProviderDq) << "Initializing node table";
+ nameserverTable->StaticNodeTable[nodeId] = std::make_pair(interconnectAddress, port);
+
+ setup->LocalServices.emplace_back(
+ MakeDnsResolverActorId(), TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0));
+
+ setup->LocalServices.emplace_back(
+ GetNameserviceActorId(), TActorSetupCmd(nameserverFactory(nameserverTable), TMailboxType::ReadAsFilled, 0));
+
+
+ InitSelfPingActor(setup.Get(), counters);
+
+ TIntrusivePtr<TInterconnectProxyCommon> icCommon = new TInterconnectProxyCommon();
+ icCommon->NameserviceId = GetNameserviceActorId();
+ Y_UNUSED(counters);
+ //icCommon->MonCounters = counters->GetSubgroup("counters", "interconnect");
+ icCommon->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>();
+
+#define SET_DURATION(name) \
+ { \
+ icCommon->Settings.name = TDuration::MilliSeconds(icSettings.Get ## name ## Ms()); \
+ YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \
+ }
+
+#define SET_VALUE(name) \
+ { \
+ icCommon->Settings.name = icSettings.Get ## name(); \
+ YQL_CLOG(DEBUG, ProviderDq) << "IC " << #name << " set to " << icCommon->Settings.name; \
+ }
+
+ SET_DURATION(Handshake);
+ SET_DURATION(DeadPeer);
+ SET_DURATION(CloseOnIdle);
+
+ SET_VALUE(SendBufferDieLimitInMB);
+ SET_VALUE(TotalInflightAmountOfData);
+ SET_VALUE(MergePerPeerCounters);
+ SET_VALUE(MergePerDataCenterCounters);
+ SET_VALUE(TCPSocketBufferSize);
+
+ SET_DURATION(PingPeriod);
+ SET_DURATION(ForceConfirmPeriod);
+ SET_DURATION(LostConnection);
+ SET_DURATION(BatchPeriod);
+
+ SET_DURATION(MessagePendingTimeout);
+
+ SET_VALUE(MessagePendingSize);
+ SET_VALUE(MaxSerializedEventSize);
+
+#undef SET_DURATION
+#undef SET_VALUE
+
+ YQL_CLOG(DEBUG, ProviderDq) << "Initializing proxy actors";
+ auto effectiveMaxNodeId = maxNodeId.GetOrElse(static_cast<ui32>(ENodeIdLimits::MaxWorkerNodeId));
+ setup->Interconnect.ProxyActors.resize(effectiveMaxNodeId + 1);
+ for (ui32 id = 1; id <= effectiveMaxNodeId; ++id) {
+ if (nodeId != id) {
+ IActor* actor = new TInterconnectProxyTCP(id, icCommon);
+ setup->Interconnect.ProxyActors[id] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0);
+ }
+ }
+
+ // start listener
+ YQL_CLOG(DEBUG, ProviderDq) << "Start listener";
+ {
+ icCommon->TechnicalSelfHostName = interconnectAddress;
+ YQL_CLOG(INFO, ProviderDq) << "Start listener " << interconnectAddress << ":" << port << " socket: " << socket;
+ IActor* listener;
+ TMaybe<SOCKET> maybeSocket = socket < 0
+ ? Nothing()
+ : TMaybe<SOCKET>(socket);
+
+ listener = new NActors::TInterconnectListenerTCP(interconnectAddress, port, icCommon, maybeSocket);
+
+ setup->LocalServices.emplace_back(
+ MakeInterconnectListenerActorId(false),
+ TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0));
+ }
+
+ YQL_CLOG(DEBUG, ProviderDq) << "Actor initialization complete";
+
+#ifdef _unix_
+ signal(SIGPIPE, SIG_IGN);
+#endif
+
+ return std::make_tuple(std::move(setup), logSettings);
+ }
+
+ std::tuple<TString, TString> GetLocalAddress(const TString* overrideHostname) {
+ constexpr auto MaxLocalHostNameLength = 4096;
+ std::array<char, MaxLocalHostNameLength> buffer;
+ buffer.fill(0);
+ TString hostName;
+ TString localAddress;
+
+ int result = gethostname(buffer.data(), buffer.size() - 1);
+ if (result != 0) {
+ Cerr << "gethostname failed for " << std::string_view{buffer.data(), buffer.size()} << " error " << strerror(errno) << Endl;
+ return std::make_tuple(hostName, localAddress);
+ }
+
+ if (overrideHostname) {
+ memcpy(&buffer[0], overrideHostname->c_str(), Min<int>(
+ overrideHostname->size()+1, buffer.size()-1
+ ));
+ }
+
+ hostName = &buffer[0];
+
+ addrinfo request;
+ memset(&request, 0, sizeof(request));
+ request.ai_family = AF_INET6;
+ request.ai_socktype = SOCK_STREAM;
+
+ addrinfo* response = nullptr;
+ result = getaddrinfo(buffer.data(), nullptr, &request, &response);
+ if (result != 0) {
+ Cerr << "getaddrinfo failed for " << std::string_view{buffer.data(), buffer.size()} << " error " << gai_strerror(result) << Endl;
+ return std::make_tuple(hostName, localAddress);
+ }
+
+ std::unique_ptr<addrinfo, void (*)(addrinfo*)> holder(response, &freeaddrinfo);
+
+ if (!response->ai_addr) {
+ Cerr << "getaddrinfo failed: no ai_addr" << Endl;
+ return std::make_tuple(hostName, localAddress);
+ }
+
+ auto* sa = response->ai_addr;
+ Y_VERIFY(sa->sa_family == AF_INET6);
+ inet_ntop(AF_INET6, &(((struct sockaddr_in6*)sa)->sin6_addr),
+ &buffer[0], buffer.size() - 1);
+
+ localAddress = &buffer[0];
+
+ return std::make_tuple(hostName, localAddress);
+ }
+
+ std::tuple<TString, TString> GetUserToken(const TMaybe<TString>& maybeUser, const TMaybe<TString>& maybeTokenFile)
+ {
+ auto home = GetEnv("HOME");
+ auto systemUser = GetEnv("USER");
+
+ TString userName = maybeUser
+ ? *maybeUser
+ : systemUser;
+
+ TString tokenFile = maybeTokenFile
+ ? *maybeTokenFile
+ : home + "/.yt/token";
+
+ TString token = TFileInput(tokenFile).ReadLine();
+
+ return std::make_tuple(userName, token);
+ }
+}
diff --git a/ydb/library/yql/providers/dq/service/interconnect_helpers.h b/ydb/library/yql/providers/dq/service/interconnect_helpers.h
new file mode 100644
index 00000000000..9e959cda160
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/interconnect_helpers.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/interconnect/poller_tcp.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+#include <library/cpp/yson/node/node.h>
+
+#include <ydb/library/yql/providers/dq/config/config.pb.h>
+
+namespace NYql::NDqs {
+
+enum class ENodeIdLimits {
+ MinServiceNodeId = 1,
+ MaxServiceNodeId = 512, // excluding
+ MinWorkerNodeId = 512,
+ MaxWorkerNodeId = 8192, // excluding
+};
+
+using TNameserverFactory = std::function<NActors::IActor*(const TIntrusivePtr<NActors::TTableNameserverSetup>& setup)>;
+
+struct TServiceNodeConfig {
+ ui32 NodeId;
+ TString InterconnectAddress;
+ TString GrpcHostname;
+ ui16 Port;
+ ui16 GrpcPort = 8080;
+ ui16 MbusPort = 0;
+ SOCKET Socket = -1;
+ SOCKET GrpcSocket = -1;
+ TMaybe<ui32> MaxNodeId;
+ NYql::NProto::TDqConfig::TICSettings ICSettings = NYql::NProto::TDqConfig::TICSettings();
+ TNameserverFactory NameserverFactory = [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
+ return CreateNameserverTable(setup);
+ };
+};
+
+std::tuple<TString, TString> GetLocalAddress(const TString* hostname = nullptr);
+std::tuple<TString, TString> GetUserToken(const TMaybe<TString>& user, const TMaybe<TString>& tokenFile);
+
+std::tuple<THolder<NActors::TActorSystemSetup>, TIntrusivePtr<NActors::NLog::TSettings>> BuildActorSetup(
+ ui32 nodeId,
+ TString interconnectAddress,
+ ui16 port,
+ SOCKET socket,
+ TVector<ui32> threads,
+ NMonitoring::TDynamicCounterPtr counters,
+ const TNameserverFactory& nameserverFactory,
+ TMaybe<ui32> maxNodeId,
+ const NYql::NProto::TDqConfig::TICSettings& icSettings = NYql::NProto::TDqConfig::TICSettings());
+
+} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp
new file mode 100644
index 00000000000..afc141dc537
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/service_node.cpp
@@ -0,0 +1,167 @@
+#include "service_node.h"
+
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/providers/common/metrics/metrics_registry.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <ydb/library/yql/providers/dq/actors/execution_helpers.h>
+
+#include <library/cpp/grpc/server/actors/logger.h>
+
+#include <utility>
+
+namespace NYql {
+ using namespace NActors;
+ using namespace NGrpc;
+ using namespace NYql::NDqs;
+
+ class TGrpcExternalListener: public IExternalListener {
+ public:
+ TGrpcExternalListener(SOCKET socket)
+ : Socket(socket)
+ , Listener(MakeIntrusive<NInterconnect::TStreamSocket>(Socket))
+ {
+ SetNonBlock(socket, true);
+ }
+
+ void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) override {
+ Acceptor = std::move(acceptor);
+ }
+
+ private:
+ void Start() override {
+ YQL_CLOG(DEBUG, ProviderDq) << "Start GRPC Listener";
+ Poller.Start();
+ StartRead();
+ }
+
+ void Stop() override {
+ Poller.Stop();
+ }
+
+ void StartRead() {
+ YQL_CLOG(TRACE, ProviderDq) << "Read next GRPC event";
+ Poller.StartRead(Listener, [&](const TIntrusivePtr<TSharedDescriptor>& ss) {
+ return Accept(ss);
+ });
+ }
+
+ TDelegate Accept(const TIntrusivePtr<TSharedDescriptor>& ss)
+ {
+ NInterconnect::TStreamSocket* socket = (NInterconnect::TStreamSocket*)ss.Get();
+ int r = 0;
+ while (r >= 0) {
+ NInterconnect::TAddress address;
+ r = socket->Accept(address);
+ if (r >= 0) {
+ YQL_CLOG(TRACE, ProviderDq) << "New GRPC connection";
+ grpc::experimental::ExternalConnectionAcceptor::NewConnectionParameters params;
+ SetNonBlock(r, true);
+ params.listener_fd = -1; // static_cast<int>(Socket);
+ params.fd = r;
+ Acceptor->HandleNewConnection(&params);
+ } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Unknown error code " + ToString(r);
+ }
+ }
+
+ return [this] {
+ StartRead();
+ };
+ }
+
+ SOCKET Socket;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Listener;
+ NInterconnect::TPollerThreads Poller;
+ std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> Acceptor;
+ };
+
+ TServiceNode::TServiceNode(
+ const TServiceNodeConfig& config,
+ ui32 threads,
+ IMetricsRegistryPtr metricsRegistry)
+ : Config(config)
+ , Threads(threads)
+ , MetricsRegistry(std::move(metricsRegistry))
+ {
+ std::tie(Setup, LogSettings) = BuildActorSetup(
+ Config.NodeId,
+ Config.InterconnectAddress,
+ Config.Port,
+ Config.Socket,
+ {Threads, 8},
+ MetricsRegistry->GetSensors(),
+ Config.NameserverFactory,
+ Config.MaxNodeId,
+ Config.ICSettings);
+ }
+
+ void TServiceNode::AddLocalService(TActorId actorId, TActorSetupCmd service) {
+ YQL_ENSURE(!ActorSystem);
+ Setup->LocalServices.emplace_back(actorId, std::move(service));
+ }
+
+ NActors::TActorSystem* TServiceNode::StartActorSystem(void* appData) {
+ Y_VERIFY(!ActorSystem);
+
+ ActorSystem = MakeHolder<NActors::TActorSystem>(Setup, appData, LogSettings);
+ ActorSystem->Start();
+
+ return ActorSystem.Get();
+ }
+
+ void TServiceNode::StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories) {
+ class TCustomOption : public grpc::ServerBuilderOption {
+ public:
+ TCustomOption() { }
+
+ void UpdateArguments(grpc::ChannelArguments *args) override {
+ args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 1);
+ }
+
+ void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
+ { }
+ };
+
+ YQL_CLOG(INFO, ProviderDq) << "Starting GRPC on " << Config.GrpcPort;
+
+ IExternalListener::TPtr listener = nullptr;
+ if (Config.GrpcSocket >= 0) {
+ listener = MakeIntrusive<TGrpcExternalListener>(Config.GrpcSocket);
+ }
+
+ auto options = TServerOptions()
+ // .SetHost(CurrentNode->Address)
+ .SetHost("[::]")
+ .SetPort(Config.GrpcPort)
+ .SetExternalListener(listener)
+ .SetWorkerThreads(2)
+ .SetGRpcMemoryQuotaBytes(1024 * 1024 * 1024)
+ .SetMaxMessageSize(1024 * 1024 * 256)
+ .SetMaxGlobalRequestInFlight(50000)
+ .SetUseAuth(false)
+ .SetKeepAliveEnable(true)
+ .SetKeepAliveIdleTimeoutTriggerSec(360)
+ .SetKeepAliveMaxProbeCount(3)
+ .SetKeepAliveProbeIntervalSec(1)
+ .SetServerBuilderMutator([](grpc::ServerBuilder& builder) {
+ builder.SetOption(std::make_unique<TCustomOption>());
+ })
+ .SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER
+
+ Server = MakeHolder<TGRpcServer>(options);
+ Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories));
+ Server->AddService(Service);
+ Server->Start();
+ }
+
+ void TServiceNode::Stop(TDuration timeout) {
+ (static_cast<TDqsGrpcService*>(Service.Get()))->Stop().Wait(timeout);
+
+ Server->Stop();
+ for (auto id : ActorIds) {
+ ActorSystem->Send(id, new NActors::TEvents::TEvPoison);
+ }
+ ActorSystem->Stop();
+ }
+} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/service/service_node.h b/ydb/library/yql/providers/dq/service/service_node.h
new file mode 100644
index 00000000000..3f60fb3cff9
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/service_node.h
@@ -0,0 +1,48 @@
+#pragma once
+
+#include "grpc_service.h"
+#include "interconnect_helpers.h"
+
+#include <ydb/library/yql/providers/common/metrics/metrics_registry.h>
+#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
+
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect_common.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
+#include <library/cpp/actors/interconnect/poller_actor.h>
+
+namespace NYql {
+ class TServiceNode {
+ public:
+ TServiceNode(
+ const NDqs::TServiceNodeConfig& config,
+ ui32 threads,
+ IMetricsRegistryPtr metricsRegistry);
+
+ void AddLocalService(NActors::TActorId actorId, NActors::TActorSetupCmd service);
+ NActors::TActorSystem* StartActorSystem(void* appData = nullptr);
+ void StartService(const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
+
+ void Stop(TDuration time = TDuration::Max());
+
+ NActors::TActorSystemSetup* GetSetup() const {
+ return Setup.Get();
+ }
+
+ private:
+ NDqs::TServiceNodeConfig Config;
+ ui32 Threads;
+ IMetricsRegistryPtr MetricsRegistry;
+ THolder<NActors::TActorSystemSetup> Setup;
+ TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
+ THolder<NActors::TActorSystem> ActorSystem;
+ TVector<NActors::TActorId> ActorIds;
+ THolder<NGrpc::TGRpcServer> Server;
+ TIntrusivePtr<NGrpc::IGRpcService> Service;
+ };
+}
diff --git a/ydb/library/yql/providers/dq/service/ya.make b/ydb/library/yql/providers/dq/service/ya.make
new file mode 100644
index 00000000000..5652eaf0f5c
--- /dev/null
+++ b/ydb/library/yql/providers/dq/service/ya.make
@@ -0,0 +1,33 @@
+LIBRARY()
+
+SRCS(
+ grpc_service.cpp
+ grpc_session.cpp
+ service_node.cpp
+ interconnect_helpers.cpp
+)
+
+PEERDIR(
+ library/cpp/actors/core
+ library/cpp/actors/dnsresolver
+ library/cpp/actors/interconnect
+ library/cpp/build_info
+ library/cpp/grpc/server
+ library/cpp/grpc/server/actors
+ library/cpp/svnversion
+ library/cpp/threading/future
+ ydb/library/yql/sql
+ ydb/public/api/protos
+ ydb/library/yql/providers/common/metrics
+ ydb/library/yql/providers/dq/actors
+ ydb/library/yql/providers/dq/api/grpc
+ ydb/library/yql/providers/dq/common
+ ydb/library/yql/providers/dq/counters
+ ydb/library/yql/providers/dq/interface
+ ydb/library/yql/providers/dq/worker_manager
+ ydb/library/yql/providers/dq/worker_manager/interface
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/providers/dq/stats_collector/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..233f0fe82bc
--- /dev/null
+++ b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-stats_collector)
+target_compile_options(providers-dq-stats_collector PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-stats_collector PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-helpers
+ cpp-monlib-dynamic_counters
+)
+target_sources(providers-dq-stats_collector PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp
+)
diff --git a/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..bf5c2e8374e
--- /dev/null
+++ b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-stats_collector)
+target_compile_options(providers-dq-stats_collector PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-stats_collector PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-helpers
+ cpp-monlib-dynamic_counters
+)
+target_sources(providers-dq-stats_collector PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp
+)
diff --git a/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..bf5c2e8374e
--- /dev/null
+++ b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-dq-stats_collector)
+target_compile_options(providers-dq-stats_collector PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-dq-stats_collector PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-helpers
+ cpp-monlib-dynamic_counters
+)
+target_sources(providers-dq-stats_collector PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp
+)
diff --git a/ydb/library/yql/providers/dq/stats_collector/CMakeLists.txt b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.txt
new file mode 100644
index 00000000000..606ff46b4be
--- /dev/null
+++ b/ydb/library/yql/providers/dq/stats_collector/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp b/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp
new file mode 100644
index 00000000000..b9a1303b3be
--- /dev/null
+++ b/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.cpp
@@ -0,0 +1,31 @@
+#include "pool_stats_collector.h"
+
+#include <library/cpp/actors/helpers/pool_stats_collector.h>
+
+namespace NYql {
+
+using namespace NActors;
+using namespace NMonitoring;
+
+namespace {
+
+TIntrusivePtr<TDynamicCounters> GetServiceCounters(TIntrusivePtr<TDynamicCounters> root,
+ const TString &service)
+{
+ auto res = root->GetSubgroup("counters", service);
+ auto utils = root->GetSubgroup("counters", "utils");
+ auto lookupCounter = utils->GetSubgroup("component", service)->GetCounter("CounterLookups", true);
+ res->SetLookupCounter(lookupCounter);
+ return res;
+}
+
+}
+
+IActor *CreateStatsCollector(ui32 intervalSec,
+ const TActorSystemSetup& setup,
+ NMonitoring::TDynamicCounterPtr counters)
+{
+ return new NActors::TStatsCollectingActor(intervalSec, setup, GetServiceCounters(counters, "utils"));
+}
+
+} // namespace NKikimr
diff --git a/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h b/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h
new file mode 100644
index 00000000000..77c2cab9a60
--- /dev/null
+++ b/ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NActors {
+ struct TActorSystemSetup;
+}
+
+// copy from kikimr/core/base/pool_stats_collector.h
+
+namespace NYql {
+
+ NActors::IActor* CreateStatsCollector(ui32 intervalSec,
+ const NActors::TActorSystemSetup& setup,
+ NMonitoring::TDynamicCounterPtr counters);
+
+}
diff --git a/ydb/library/yql/providers/dq/stats_collector/ya.make b/ydb/library/yql/providers/dq/stats_collector/ya.make
new file mode 100644
index 00000000000..dcd30c4e90a
--- /dev/null
+++ b/ydb/library/yql/providers/dq/stats_collector/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+SET(
+ SOURCE
+ pool_stats_collector.cpp
+)
+
+SRCS(
+ ${SOURCE}
+)
+
+PEERDIR(
+ library/cpp/actors/core
+ library/cpp/actors/helpers
+ library/cpp/monlib/dynamic_counters
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/providers/yt/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..c762ef5d46c
--- /dev/null
+++ b/ydb/library/yql/providers/yt/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(codec)
+add_subdirectory(common)
+add_subdirectory(comp_nodes)
+add_subdirectory(dq_task_preprocessor)
+add_subdirectory(expr_nodes)
+add_subdirectory(gateway)
+add_subdirectory(job)
+add_subdirectory(lib)
+add_subdirectory(mkql_dq)
+add_subdirectory(opt)
+add_subdirectory(provider)
diff --git a/ydb/library/yql/providers/yt/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..c762ef5d46c
--- /dev/null
+++ b/ydb/library/yql/providers/yt/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(codec)
+add_subdirectory(common)
+add_subdirectory(comp_nodes)
+add_subdirectory(dq_task_preprocessor)
+add_subdirectory(expr_nodes)
+add_subdirectory(gateway)
+add_subdirectory(job)
+add_subdirectory(lib)
+add_subdirectory(mkql_dq)
+add_subdirectory(opt)
+add_subdirectory(provider)
diff --git a/ydb/library/yql/providers/yt/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..c762ef5d46c
--- /dev/null
+++ b/ydb/library/yql/providers/yt/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(codec)
+add_subdirectory(common)
+add_subdirectory(comp_nodes)
+add_subdirectory(dq_task_preprocessor)
+add_subdirectory(expr_nodes)
+add_subdirectory(gateway)
+add_subdirectory(job)
+add_subdirectory(lib)
+add_subdirectory(mkql_dq)
+add_subdirectory(opt)
+add_subdirectory(provider)
diff --git a/ydb/library/yql/providers/yt/CMakeLists.txt b/ydb/library/yql/providers/yt/CMakeLists.txt
index 8e464fc08e9..f8b31df0c11 100644
--- a/ydb/library/yql/providers/yt/CMakeLists.txt
+++ b/ydb/library/yql/providers/yt/CMakeLists.txt
@@ -6,13 +6,12 @@
# original buildsystem will not be accepted.
-add_subdirectory(codec)
-add_subdirectory(common)
-add_subdirectory(comp_nodes)
-add_subdirectory(expr_nodes)
-add_subdirectory(gateway)
-add_subdirectory(job)
-add_subdirectory(lib)
-add_subdirectory(mkql_dq)
-add_subdirectory(opt)
-add_subdirectory(provider)
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/yt/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..8e464fc08e9
--- /dev/null
+++ b/ydb/library/yql/providers/yt/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(codec)
+add_subdirectory(common)
+add_subdirectory(comp_nodes)
+add_subdirectory(expr_nodes)
+add_subdirectory(gateway)
+add_subdirectory(job)
+add_subdirectory(lib)
+add_subdirectory(mkql_dq)
+add_subdirectory(opt)
+add_subdirectory(provider)
diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..7d03fe3302f
--- /dev/null
+++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,34 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-yt-dq_task_preprocessor)
+target_compile_options(providers-yt-dq_task_preprocessor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-yt-dq_task_preprocessor PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-yson
+ cpp-yson-node
+ cpp-mapreduce-common
+ cpp-mapreduce-interface
+ library-yql-utils
+ yql-utils-log
+ yql-utils-failure_injector
+ library-yql-minikql
+ minikql-computation-llvm
+ providers-common-codec
+ providers-dq-interface
+ providers-yt-codec
+ yt-gateway-lib
+ yt-lib-yson_helpers
+)
+target_sources(providers-yt-dq_task_preprocessor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp
+)
diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..b03d0481c58
--- /dev/null
+++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,35 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-yt-dq_task_preprocessor)
+target_compile_options(providers-yt-dq_task_preprocessor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-yt-dq_task_preprocessor PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-yson
+ cpp-yson-node
+ cpp-mapreduce-common
+ cpp-mapreduce-interface
+ library-yql-utils
+ yql-utils-log
+ yql-utils-failure_injector
+ library-yql-minikql
+ minikql-computation-llvm
+ providers-common-codec
+ providers-dq-interface
+ providers-yt-codec
+ yt-gateway-lib
+ yt-lib-yson_helpers
+)
+target_sources(providers-yt-dq_task_preprocessor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp
+)
diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..b03d0481c58
--- /dev/null
+++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,35 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-yt-dq_task_preprocessor)
+target_compile_options(providers-yt-dq_task_preprocessor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-yt-dq_task_preprocessor PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-yson
+ cpp-yson-node
+ cpp-mapreduce-common
+ cpp-mapreduce-interface
+ library-yql-utils
+ yql-utils-log
+ yql-utils-failure_injector
+ library-yql-minikql
+ minikql-computation-llvm
+ providers-common-codec
+ providers-dq-interface
+ providers-yt-codec
+ yt-gateway-lib
+ yt-lib-yson_helpers
+)
+target_sources(providers-yt-dq_task_preprocessor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp
+)
diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.txt b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.txt
new file mode 100644
index 00000000000..606ff46b4be
--- /dev/null
+++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/ya.make b/ydb/library/yql/providers/yt/dq_task_preprocessor/ya.make
new file mode 100644
index 00000000000..2e43b4600b3
--- /dev/null
+++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/ya.make
@@ -0,0 +1,24 @@
+LIBRARY()
+
+SRC(yql_yt_dq_task_preprocessor.cpp)
+
+PEERDIR(
+ library/cpp/yson
+ library/cpp/yson/node
+ yt/cpp/mapreduce/common
+ yt/cpp/mapreduce/interface
+ ydb/library/yql/utils
+ ydb/library/yql/utils/log
+ ydb/library/yql/utils/failure_injector
+ ydb/library/yql/minikql
+ ydb/library/yql/minikql/computation/llvm
+ ydb/library/yql/providers/common/codec
+ ydb/library/yql/providers/dq/interface
+ ydb/library/yql/providers/yt/codec
+ ydb/library/yql/providers/yt/gateway/lib
+ ydb/library/yql/providers/yt/lib/yson_helpers
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp b/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp
new file mode 100644
index 00000000000..8251e293078
--- /dev/null
+++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.cpp
@@ -0,0 +1,365 @@
+#include "yql_yt_dq_task_preprocessor.h"
+
+#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h>
+#include <ydb/library/yql/providers/yt/codec/yt_codec.h>
+#include <ydb/library/yql/providers/yt/codec/yt_codec_io.h>
+#include <ydb/library/yql/providers/yt/lib/yson_helpers/yson_helpers.h>
+#include <ydb/library/yql/providers/common/codec/yql_codec.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/failure_injector/failure_injector.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/mkql_alloc.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_mem_info.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <yt/cpp/mapreduce/client/client.h>
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <library/cpp/yson/node/node.h>
+#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/yson/node/node_visitor.h>
+#include <library/cpp/yson/parser.h>
+#include <library/cpp/yson/writer.h>
+
+#include <util/generic/hash.h>
+#include <util/generic/guid.h>
+#include <util/generic/yexception.h>
+#include <util/generic/ptr.h>
+#include <util/system/env.h>
+#include <util/stream/str.h>
+#include <util/stream/file.h>
+#include <util/stream/mem.h>
+
+#include <string_view>
+
+namespace NYql::NDq {
+
+using namespace NKikimr::NMiniKQL;
+
+namespace {
+
+class TYtFullResultWriter: public IDqFullResultWriter {
+public:
+ TYtFullResultWriter(IFunctionRegistry::TPtr funcRegistry, NYT::ITransactionPtr tx, const TString& path, const TString& spec)
+ : FuncRegistry_(std::move(funcRegistry))
+ , Alloc(__LOCATION__)
+ , TypeEnv(Alloc)
+ , MemInfo("DqFullResultWriter")
+ , HolderFactory(Alloc.Ref(), MemInfo, FuncRegistry_.Get())
+ , CodecContext(TypeEnv, *FuncRegistry_, &HolderFactory)
+ , Specs()
+ {
+ Specs.SetUseSkiff("");
+ Specs.Init(CodecContext, spec);
+ OutStream = tx->CreateRawWriter(NYT::TRichYPath{path}, Specs.MakeOutputFormat(), NYT::TTableWriterOptions());
+ TableWriter = MakeHolder<TMkqlWriterImpl>(OutStream, 4_MB);
+ TableWriter->SetSpecs(Specs);
+ Alloc.Release();
+ }
+
+ ~TYtFullResultWriter() {
+ try {
+ Abort();
+ } catch (...) {
+ }
+ Alloc.Acquire();
+ };
+
+ void AddRow(const NUdf::TUnboxedValuePod& row) override {
+ YQL_ENSURE(!Finished);
+ try {
+ TableWriter->AddRow(row);
+ ++RowCount;
+ } catch (const NYT::TErrorResponse& e) {
+ TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what());
+ throw yexception() << errMsg;
+ }
+ }
+
+ void Finish() override {
+ if (!Finished) {
+ try {
+ TableWriter->Finish();
+ } catch (const NYT::TErrorResponse& e) {
+ TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what());
+ throw yexception() << errMsg;
+ }
+ Finished = true;
+ }
+ }
+
+ void Abort() override {
+ if (!Finished) {
+ try {
+ TableWriter->Abort();
+ OutStream->Abort();
+ } catch (const NYT::TErrorResponse& e) {
+ TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what());
+ throw yexception() << errMsg;
+ }
+ Finished = true;
+ }
+ }
+
+ ui64 GetRowCount() const override {
+ return RowCount;
+ }
+
+private:
+ const IFunctionRegistry::TPtr FuncRegistry_;
+ TScopedAlloc Alloc;
+ TTypeEnvironment TypeEnv;
+ TMemoryUsageInfo MemInfo;
+ THolderFactory HolderFactory;
+ NCommon::TCodecContext CodecContext;
+ TMkqlIOSpecs Specs;
+ NYT::TRawTableWriterPtr OutStream;
+ THolder<TMkqlWriterImpl> TableWriter;
+ ui64 RowCount{0};
+ bool Finished = false;
+};
+
+class TFileFullResultWriter: public IDqFullResultWriter {
+public:
+ TFileFullResultWriter(IFunctionRegistry::TPtr funcRegistry, const TString& path, const TString& spec, const TString& attrs)
+ : Path(path)
+ , Attrs(attrs)
+ , FuncRegistry_(std::move(funcRegistry))
+ , Alloc(__LOCATION__)
+ , TypeEnv(Alloc)
+ , MemInfo("DqFullResultWriter")
+ , HolderFactory(Alloc.Ref(), MemInfo, FuncRegistry_.Get())
+ , CodecContext(TypeEnv, *FuncRegistry_, &HolderFactory)
+ , Specs()
+ {
+ Specs.Init(CodecContext, spec);
+ TableWriter = MakeHolder<TMkqlWriterImpl>(OutStream, 1, 4_MB);
+ TableWriter->SetSpecs(Specs);
+ Alloc.Release();
+ }
+
+ ~TFileFullResultWriter() {
+ Alloc.Acquire();
+ };
+
+ void AddRow(const NUdf::TUnboxedValuePod& row) override {
+ YQL_ENSURE(!Finished);
+ TableWriter->AddRow(row);
+ ++RowCount;
+ }
+
+ void Finish() override {
+ if (!Finished) {
+ TableWriter->Finish();
+ if (auto binaryYson = OutStream.Str()) {
+ TMemoryInput in(binaryYson);
+ TOFStream of(Path);
+ TDoubleHighPrecisionYsonWriter writer(&of, ::NYson::EYsonType::ListFragment);
+ NYson::TYsonParser parser(&writer, &in, ::NYson::EYsonType::ListFragment);
+ parser.Parse();
+ }
+ else {
+ YQL_ENSURE(TFile(Path, CreateAlways | WrOnly).IsOpen(), "Failed to create " << Path.Quote() << " file");
+ }
+
+ {
+ NYT::TNode attrs = NYT::NodeFromYsonString(Attrs);
+ TOFStream ofAttr(Path + ".attr");
+ NYson::TYsonWriter writer(&ofAttr, NYson::EYsonFormat::Pretty, ::NYson::EYsonType::Node);
+ NYT::TNodeVisitor visitor(&writer);
+ visitor.Visit(attrs);
+ }
+
+ Finished = true;
+ }
+ }
+
+ void Abort() override {
+ Finished = true;
+ }
+
+ ui64 GetRowCount() const override {
+ return RowCount;
+ }
+
+private:
+ const TString Path;
+ const TString Attrs;
+ const IFunctionRegistry::TPtr FuncRegistry_;
+ TScopedAlloc Alloc;
+ TTypeEnvironment TypeEnv;
+ TMemoryUsageInfo MemInfo;
+ THolderFactory HolderFactory;
+ NCommon::TCodecContext CodecContext;
+ TMkqlIOSpecs Specs;
+ TStringStream OutStream;
+ THolder<TMkqlWriterImpl> TableWriter;
+ ui64 RowCount{0};
+ bool Finished = false;
+};
+
+} // unnamed
+
+class TYtDqTaskPreprocessor : public IDqTaskPreprocessor {
+public:
+ explicit TYtDqTaskPreprocessor(bool ytEmulationMode, IFunctionRegistry::TPtr funcRegistry)
+ : YtEmulationMode_(ytEmulationMode)
+ , FuncRegistry_(std::move(funcRegistry))
+ {
+ }
+
+ TYtDqTaskPreprocessor(TYtDqTaskPreprocessor&&) = default;
+ TYtDqTaskPreprocessor& operator=(TYtDqTaskPreprocessor&&) = default;
+
+ THashMap<TString, TString> GetTaskParams(const THashMap<TString, TString>& graphParams, const THashMap<TString, TString>& secureParams) final {
+ YQL_LOG_CTX_SCOPE(TStringBuf("YtDqTaskPreprocessor"), __FUNCTION__);
+ THashMap<TString, TString> result;
+
+ GraphParams_ = graphParams;
+ SecureParams_ = secureParams;
+ if (YtEmulationMode_) {
+ YQL_LOG(DEBUG) << "No nested transactions are created in YT emulation mode, skipping";
+ result["yt.write.tx"] = GetGuidAsString(TGUID());
+ } else if (auto p = graphParams.FindPtr("yt.write")) {
+ try {
+ auto params = NYT::NodeFromYsonString(*p);
+ auto rootTxId = params["root_tx"].AsString();
+ auto server = params["server"].AsString();
+ auto token = params["token"].AsString();
+ auto& client = Clients_[std::make_pair(server, token)];
+ NYT::TCreateClientOptions opts;
+ if (token) {
+ opts.Token(secureParams.Value(token, ""));
+ }
+ client = NYT::CreateClient(server, std::move(opts));
+ auto rootTx = client->AttachTransaction(GetGuid(rootTxId));
+ auto subTx = rootTx->StartTransaction();
+ auto subTxId = GetGuidAsString(subTx->GetId());
+ TFailureInjector::Reach("expire_tx", [&] {
+ subTx->Abort();
+ subTx = rootTx->StartTransaction(NYT::TStartTransactionOptions().Timeout(TDuration::Seconds(1)).AutoPingable(false));
+ subTxId = GetGuidAsString(subTx->GetId());
+ ::Sleep(TDuration::Seconds(1));
+ });
+
+ YQL_LOG(DEBUG) << "Cluster " << server << ", creating nested " << subTxId << " for " << rootTxId;
+ result["yt.write.tx"] = subTxId;
+ WriteSubTx_ = std::move(subTx);
+ }
+ catch (const NYT::TErrorResponse& e) {
+ TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what());
+ throw yexception() << errMsg;
+ }
+ }
+ return result;
+ }
+
+ void Finish(bool success) override {
+ GraphParams_.clear();
+ SecureParams_.clear();
+ try {
+ if (WriteSubTx_) {
+ if (success) {
+ YQL_LOG(DEBUG) << "Committing " << GetGuidAsString(WriteSubTx_->GetId());
+ TFailureInjector::Reach("fail_commit", [] { throw yexception() << "fail_commit"; });
+ WriteSubTx_->Commit();
+ } else {
+ YQL_LOG(DEBUG) << "Aborting " << GetGuidAsString(WriteSubTx_->GetId());
+ WriteSubTx_->Abort();
+ }
+ WriteSubTx_.Reset();
+ }
+ if (FullResultSubTx_) {
+ if (success) {
+ YQL_LOG(DEBUG) << "Committing " << GetGuidAsString(FullResultSubTx_->GetId());
+ TFailureInjector::Reach("full_result_fail_commit", [] { throw yexception() << "full_result_fail_commit"; });
+ FullResultSubTx_->Commit();
+ } else {
+ YQL_LOG(DEBUG) << "Aborting " << GetGuidAsString(FullResultSubTx_->GetId());
+ FullResultSubTx_->Abort();
+ }
+ FullResultSubTx_.Reset();
+ }
+ Clients_.clear();
+ } catch (const NYT::TErrorResponse& e) {
+ TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what());
+ throw yexception() << errMsg;
+ }
+ }
+
+ THolder<IDqFullResultWriter> CreateFullResultWriter() override {
+ if (auto p = GraphParams_.FindPtr("yt.full_result_table")) {
+ try {
+ auto params = NYT::NodeFromYsonString(*p);
+ if (YtEmulationMode_) {
+ return MakeHolder<TFileFullResultWriter>(FuncRegistry_, params["path"].AsString(), params["codecSpec"].AsString(), params["tableAttrs"].AsString());
+ } else {
+ auto server = params["server"].AsString();
+ auto token = params["token"].AsString();
+ auto& client = Clients_[std::make_pair(server, token)];
+ if (!client) {
+ NYT::TCreateClientOptions opts;
+ if (token) {
+ opts.Token(SecureParams_.Value(token, ""));
+ }
+ client = NYT::CreateClient(server, std::move(opts));
+ }
+
+ NYT::ITransactionPtr subTx;
+ NYT::IClientBasePtr parentTx = client;
+ if (params.HasKey("root_tx")) {
+ auto rootTx = client->AttachTransaction(GetGuid(params["root_tx"].AsString()));
+ subTx = rootTx->StartTransaction();
+ if (params.HasKey("external_tx")) {
+ parentTx = client->AttachTransaction(GetGuid(params["external_tx"].AsString()));
+ }
+ } else if (params.HasKey("external_tx")) {
+ parentTx = client->AttachTransaction(GetGuid(params["external_tx"].AsString()));
+ subTx = parentTx->StartTransaction();
+ } else {
+ subTx = client->StartTransaction();
+ }
+
+ auto path = params["path"].AsString();
+ CreateParents({path}, parentTx);
+
+ subTx->Create(path, NYT::NT_TABLE,
+ NYT::TCreateOptions().Force(true).Attributes(NYT::NodeFromYsonString(params["tableAttrs"].AsString()))
+ );
+
+ TFailureInjector::Reach("full_result_fail_create", [&] { throw yexception() << "full_result_fail_create"; });
+
+ FullResultSubTx_ = subTx;
+
+ return MakeHolder<TYtFullResultWriter>(FuncRegistry_, subTx, path, params["codecSpec"].AsString());
+ }
+ }
+ catch (const NYT::TErrorResponse& e) {
+ TString errMsg = GetEnv("YQL_DETERMINISTIC_MODE") ? e.GetError().ShortDescription() : TString(e.what());
+ throw yexception() << errMsg;
+ }
+ }
+
+ return {};
+ }
+
+private:
+ THashMap<TString, TString> GraphParams_;
+ THashMap<TString, TString> SecureParams_;
+ THashMap<std::pair<TString, TString>, NYT::IClientPtr> Clients_;
+ NYT::ITransactionPtr WriteSubTx_;
+ NYT::ITransactionPtr FullResultSubTx_;
+
+ const bool YtEmulationMode_;
+ const IFunctionRegistry::TPtr FuncRegistry_;
+};
+
+TDqTaskPreprocessorFactory CreateYtDqTaskPreprocessorFactory(bool ytEmulationMode, IFunctionRegistry::TPtr funcRegistry) {
+ return [=]() {
+ return new TYtDqTaskPreprocessor(ytEmulationMode, funcRegistry);
+ };
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h b/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h
new file mode 100644
index 00000000000..6939505b89b
--- /dev/null
+++ b/ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+
+namespace NYql::NDq {
+
+TDqTaskPreprocessorFactory CreateYtDqTaskPreprocessorFactory(bool ytEmulationMode, NKikimr::NMiniKQL::IFunctionRegistry::TPtr funcRegistry);
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/tools/CMakeLists.txt b/ydb/library/yql/tools/CMakeLists.txt
index 02221978de6..ccad74919ca 100644
--- a/ydb/library/yql/tools/CMakeLists.txt
+++ b/ydb/library/yql/tools/CMakeLists.txt
@@ -7,6 +7,7 @@
add_subdirectory(astdiff)
+add_subdirectory(dqrun)
add_subdirectory(mrjob)
add_subdirectory(sql2yql)
add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..5b83f580994
--- /dev/null
+++ b/ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,94 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(dqrun)
+target_compile_options(dqrun PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(dqrun PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ contrib-libs-protobuf
+ client-ydb_persqueue_public-codecs
+ cpp-actors-http
+ library-cpp-getopt
+ cpp-lfalloc-alloc_profiler
+ library-cpp-logger
+ library-cpp-resource
+ library-cpp-yson
+ cpp-mapreduce-interface
+ yql-sql-pg
+ yql-core-facade
+ yql-core-file_storage
+ core-file_storage-proto
+ core-file_storage-http_download
+ yql-core-services
+ core-services-mounts
+ yql-dq-comp_nodes
+ dq-integration-transform
+ yql-dq-transform
+ minikql-comp_nodes-llvm
+ minikql-invoke_builtins-llvm
+ providers-clickhouse-actors
+ providers-clickhouse-provider
+ providers-common-comp_nodes
+ providers-common-proto
+ providers-common-udf_resolve
+ providers-generic-actors
+ providers-generic-provider
+ providers-dq-local_gateway
+ providers-dq-provider
+ dq-provider-exec
+ providers-pq-async_io
+ pq-gateway-native
+ providers-pq-provider
+ providers-s3-actors
+ providers-s3-provider
+ providers-solomon-gateway
+ providers-solomon-provider
+ providers-ydb-actors
+ providers-ydb-comp_nodes
+ providers-ydb-provider
+ udf-service-terminate_policy
+ yql-utils-backtrace
+ yql-utils-bindings
+ yql-utils-log
+ yql-core-url_preprocessing
+ yql-core-url_lister
+ yt-comp_nodes-dq
+ providers-yt-dq_task_preprocessor
+ yt-gateway-file
+ yt-gateway-native
+ providers-yt-mkql_dq
+ providers-yt-provider
+ yt-lib-yt_download
+ yt-lib-yt_url_lister
+ yt-lib-config_clusters
+ yql-parser-pg_wrapper
+ utils-log-proto
+ yql-utils-actor_system
+ fq-libs-actors
+ fq-libs-db_id_async_resolver_impl
+ clickhouse_client_udf
+)
+target_link_options(dqrun PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(dqrun PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dqrun/dqrun.cpp
+)
+target_allocator(dqrun
+ cpp-malloc-jemalloc
+)
+vcs_info(dqrun)
diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..e5251bfa438
--- /dev/null
+++ b/ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,98 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(dqrun)
+target_compile_options(dqrun PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(dqrun PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+ client-ydb_persqueue_public-codecs
+ cpp-actors-http
+ library-cpp-getopt
+ cpp-lfalloc-alloc_profiler
+ library-cpp-logger
+ library-cpp-resource
+ library-cpp-yson
+ cpp-mapreduce-interface
+ yql-sql-pg
+ yql-core-facade
+ yql-core-file_storage
+ core-file_storage-proto
+ core-file_storage-http_download
+ yql-core-services
+ core-services-mounts
+ yql-dq-comp_nodes
+ dq-integration-transform
+ yql-dq-transform
+ minikql-comp_nodes-llvm
+ minikql-invoke_builtins-llvm
+ providers-clickhouse-actors
+ providers-clickhouse-provider
+ providers-common-comp_nodes
+ providers-common-proto
+ providers-common-udf_resolve
+ providers-generic-actors
+ providers-generic-provider
+ providers-dq-local_gateway
+ providers-dq-provider
+ dq-provider-exec
+ providers-pq-async_io
+ pq-gateway-native
+ providers-pq-provider
+ providers-s3-actors
+ providers-s3-provider
+ providers-solomon-gateway
+ providers-solomon-provider
+ providers-ydb-actors
+ providers-ydb-comp_nodes
+ providers-ydb-provider
+ udf-service-terminate_policy
+ yql-utils-backtrace
+ yql-utils-bindings
+ yql-utils-log
+ yql-core-url_preprocessing
+ yql-core-url_lister
+ yt-comp_nodes-dq
+ providers-yt-dq_task_preprocessor
+ yt-gateway-file
+ yt-gateway-native
+ providers-yt-mkql_dq
+ providers-yt-provider
+ yt-lib-yt_download
+ yt-lib-yt_url_lister
+ yt-lib-config_clusters
+ yql-parser-pg_wrapper
+ utils-log-proto
+ yql-utils-actor_system
+ fq-libs-actors
+ fq-libs-db_id_async_resolver_impl
+ clickhouse_client_udf
+)
+target_link_options(dqrun PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+ -lutil
+)
+target_sources(dqrun PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dqrun/dqrun.cpp
+)
+target_allocator(dqrun
+ cpp-malloc-jemalloc
+)
+vcs_info(dqrun)
diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..9d712a457d5
--- /dev/null
+++ b/ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,99 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(dqrun)
+target_compile_options(dqrun PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(dqrun PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ contrib-libs-protobuf
+ client-ydb_persqueue_public-codecs
+ cpp-actors-http
+ library-cpp-getopt
+ cpp-lfalloc-alloc_profiler
+ library-cpp-logger
+ library-cpp-resource
+ library-cpp-yson
+ cpp-mapreduce-interface
+ yql-sql-pg
+ yql-core-facade
+ yql-core-file_storage
+ core-file_storage-proto
+ core-file_storage-http_download
+ yql-core-services
+ core-services-mounts
+ yql-dq-comp_nodes
+ dq-integration-transform
+ yql-dq-transform
+ minikql-comp_nodes-llvm
+ minikql-invoke_builtins-llvm
+ providers-clickhouse-actors
+ providers-clickhouse-provider
+ providers-common-comp_nodes
+ providers-common-proto
+ providers-common-udf_resolve
+ providers-generic-actors
+ providers-generic-provider
+ providers-dq-local_gateway
+ providers-dq-provider
+ dq-provider-exec
+ providers-pq-async_io
+ pq-gateway-native
+ providers-pq-provider
+ providers-s3-actors
+ providers-s3-provider
+ providers-solomon-gateway
+ providers-solomon-provider
+ providers-ydb-actors
+ providers-ydb-comp_nodes
+ providers-ydb-provider
+ udf-service-terminate_policy
+ yql-utils-backtrace
+ yql-utils-bindings
+ yql-utils-log
+ yql-core-url_preprocessing
+ yql-core-url_lister
+ yt-comp_nodes-dq
+ providers-yt-dq_task_preprocessor
+ yt-gateway-file
+ yt-gateway-native
+ providers-yt-mkql_dq
+ providers-yt-provider
+ yt-lib-yt_download
+ yt-lib-yt_url_lister
+ yt-lib-config_clusters
+ yql-parser-pg_wrapper
+ utils-log-proto
+ yql-utils-actor_system
+ fq-libs-actors
+ fq-libs-db_id_async_resolver_impl
+ clickhouse_client_udf
+)
+target_link_options(dqrun PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+ -lutil
+)
+target_sources(dqrun PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dqrun/dqrun.cpp
+)
+target_allocator(dqrun
+ cpp-malloc-jemalloc
+)
+vcs_info(dqrun)
diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.txt b/ydb/library/yql/tools/dqrun/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/library/yql/tools/dqrun/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/tools/dqrun/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/dqrun/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..59bf65b0381
--- /dev/null
+++ b/ydb/library/yql/tools/dqrun/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,14 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-tools-dqrun INTERFACE)
+target_link_libraries(yql-tools-dqrun INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp
new file mode 100644
index 00000000000..82c121ac385
--- /dev/null
+++ b/ydb/library/yql/tools/dqrun/dqrun.cpp
@@ -0,0 +1,932 @@
+#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h>
+#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.h>
+#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h>
+#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
+#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
+#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>
+#include <ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h>
+#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
+#include <ydb/library/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h>
+#include <ydb/library/yql/providers/yt/lib/config_clusters/config_clusters.h>
+#include <ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h>
+
+#include <ydb/library/yql/utils/log/proto/logger_config.pb.h>
+#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
+#include <ydb/library/yql/utils/actor_system/manager.h>
+
+#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h>
+#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
+#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h>
+#include <ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h>
+#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
+#include <ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h>
+#include <ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h>
+#include <ydb/library/yql/providers/generic/actors/yql_generic_source_factory.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
+#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h>
+#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
+#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h>
+#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
+#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_factory.h>
+#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_dq_transform.h>
+#include <ydb/library/yql/providers/function/gateway/dq_function_gateway.h>
+#include <ydb/library/yql/providers/function/provider/dq_function_provider.h>
+#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
+#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
+#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
+#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
+#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
+#include <ydb/library/yql/providers/common/metrics/protos/metrics_registry.pb.h>
+#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
+#include <ydb/library/yql/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
+#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
+#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/mkql_utils.h>
+#include <ydb/library/yql/protos/yql_mount.pb.h>
+#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
+#include <ydb/library/yql/core/file_storage/http_download/http_download.h>
+#include <ydb/library/yql/core/file_storage/file_storage.h>
+#include <ydb/library/yql/core/facade/yql_facade.h>
+#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
+#include <ydb/library/yql/core/services/yql_out_transformers.h>
+#include <ydb/library/yql/core/url_lister/url_lister_manager.h>
+#include <ydb/library/yql/core/yql_library_compiler.h>
+#include <ydb/library/yql/utils/log/tls_backend.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+#include <ydb/library/yql/utils/bindings/utils.h>
+
+#include <ydb/core/fq/libs/actors/database_resolver.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
+#include <ydb/core/util/pb.h>
+
+#include <kikimr/yndx/cm_client/client.h>
+#include <yt/cpp/mapreduce/interface/init.h>
+
+#include <library/cpp/yson/public.h>
+#include <library/cpp/yson/writer.h>
+#include <library/cpp/getopt/last_getopt.h>
+#include <library/cpp/logger/priority.h>
+#include <library/cpp/protobuf/util/pb_io.h>
+#include <library/cpp/actors/http/http_proxy.h>
+
+#include <util/generic/string.h>
+#include <util/generic/hash.h>
+#include <util/generic/scope.h>
+#include <util/generic/vector.h>
+#include <util/stream/output.h>
+#include <util/stream/file.h>
+#include <util/system/user.h>
+#include <util/system/env.h>
+#include <util/system/file.h>
+#include <util/string/strip.h>
+
+#ifdef PROFILE_MEMORY_ALLOCATIONS
+#include <library/cpp/lfalloc/alloc_profiler/profiler.h>
+#endif
+
+using namespace NKikimr;
+using namespace NYql;
+
+struct TRunOptions {
+ bool Sql = false;
+ TString User;
+ TMaybe<TString> BindingsFile;
+ NYson::EYsonFormat ResultsFormat;
+ bool OptimizeOnly = false;
+ bool PeepholeOnly = false;
+ bool TraceOpt = false;
+ IOutputStream* StatisticsStream = nullptr;
+ bool PrintPlan = false;
+ bool AnalyzeQuery = false;
+ bool AnsiLexer = false;
+ IOutputStream* ExprOut = nullptr;
+ IOutputStream* ResultOut = &Cout;
+ IOutputStream* ErrStream = &Cerr;
+ IOutputStream* TracePlan = &Cerr;
+};
+
+class TStoreMappingFunctor: public NLastGetopt::IOptHandler {
+public:
+ TStoreMappingFunctor(THashMap<TString, TString>* target, char delim = '@')
+ : Target(target)
+ , Delim(delim)
+ {
+ }
+
+ void HandleOpt(const NLastGetopt::TOptsParser* parser) final {
+ const TStringBuf val(parser->CurValOrDef());
+ const auto service = TString(val.After(Delim));
+ auto res = Target->emplace(TString(val.Before(Delim)), service);
+ if (!res.second) {
+ /// force replace already exist parameter
+ res.first->second = service;
+ }
+ }
+
+private:
+ THashMap<TString, TString>* Target;
+ char Delim;
+};
+
+void ReadGatewaysConfig(const TString& configFile, TGatewaysConfig* config) {
+ auto configData = TFileInput(configFile ? configFile : "../../cfg/local/gateways.conf").ReadAll();
+
+ using ::google::protobuf::TextFormat;
+ if (!TextFormat::ParseFromString(configData, config)) {
+ ythrow yexception() << "Bad format of gateways configuration";
+ }
+}
+
+TFileStoragePtr CreateFS(const TString& paramsFile, const TString& defYtServer) {
+ TFileStorageConfig params;
+ LoadFsConfigFromFile(paramsFile ? paramsFile : "../../cfg/local/fs.conf", params);
+ return WithAsync(CreateFileStorage(params, {MakeYtDownloader(params, defYtServer)}));
+}
+
+void FillUsedFiles(
+ const TVector<TString>& filesMappingList,
+ TUserDataTable& filesMapping)
+{
+ for (auto& s : filesMappingList) {
+ TStringBuf fileName, filePath;
+ TStringBuf(s).Split('@', fileName, filePath);
+ if (fileName.empty() || filePath.empty()) {
+ ythrow yexception() << "Incorrect file mapping, expected form "
+ "name@path, e.g. MyFile@file.txt";
+ }
+
+ auto& entry = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + fileName)];
+ entry.Type = EUserDataType::PATH;
+ entry.Data = filePath;
+ }
+}
+
+bool FillUsedUrls(
+ const TVector<TString>& urlMappingList,
+ TUserDataTable& filesMapping)
+{
+ for (auto& s : urlMappingList) {
+ TStringBuf name, url;
+ TStringBuf(s).Split('@', name, url);
+ if (name.empty() || url.empty()) {
+ Cerr << "Incorrect url mapping, expected form name@url, "
+ "e.g. MyUrl@http://example.com/file" << Endl;
+ return false;
+ }
+
+ auto& entry = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + name)];
+ entry.Type = EUserDataType::URL;
+ entry.Data = url;
+ }
+ return true;
+}
+
+class TOptPipelineConfigurator : public IPipelineConfigurator {
+public:
+ TOptPipelineConfigurator(TProgramPtr prg, bool printPlan, IOutputStream* tracePlan)
+ : Program(std::move(prg)), PrintPlan(printPlan), TracePlan(tracePlan)
+ {
+ }
+
+ void AfterCreate(TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
+ pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE),
+ "OptTrace", TIssuesIds::CORE, "OptTrace");
+ }
+
+ void AfterOptimize(TTransformationPipeline* pipeline) const final {
+ if (PrintPlan) {
+ pipeline->Add(TPlanOutputTransformer::Sync(TracePlan, Program->GetPlanBuilder(), Program->GetOutputFormat()), "PlanOutput");
+ }
+ }
+
+private:
+ TProgramPtr Program;
+ bool PrintPlan;
+ IOutputStream* TracePlan;
+};
+
+NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway, NYql::NConnector::IClient::TPtr genericClient, size_t HTTPmaxTimeSeconds, size_t maxRetriesCount) {
+ auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
+ RegisterDqPqReadActorFactory(*factory, driver, nullptr);
+ RegisterYdbReadActorFactory(*factory, driver, nullptr);
+ RegisterS3ReadActorFactory(*factory, nullptr, httpGateway, GetHTTPDefaultRetryPolicy(TDuration::Seconds(HTTPmaxTimeSeconds), maxRetriesCount), {}, nullptr);
+ RegisterS3WriteActorFactory(*factory, nullptr, httpGateway);
+ RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway);
+ RegisterGenericReadActorFactory(*factory, nullptr, genericClient);
+
+ RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
+
+ return factory;
+}
+
+NActors::NLog::EPriority YqlToActorsLogLevel(NYql::NLog::ELevel yqlLevel) {
+ switch (yqlLevel) {
+ case NYql::NLog::ELevel::FATAL:
+ return NActors::NLog::PRI_CRIT;
+ case NYql::NLog::ELevel::ERROR:
+ return NActors::NLog::PRI_ERROR;
+ case NYql::NLog::ELevel::WARN:
+ return NActors::NLog::PRI_WARN;
+ case NYql::NLog::ELevel::INFO:
+ return NActors::NLog::PRI_INFO;
+ case NYql::NLog::ELevel::DEBUG:
+ return NActors::NLog::PRI_DEBUG;
+ case NYql::NLog::ELevel::TRACE:
+ return NActors::NLog::PRI_TRACE;
+ default:
+ ythrow yexception() << "unexpected level: " << int(yqlLevel);
+ }
+}
+
+struct TActorIds {
+ NActors::TActorId DatabaseResolver;
+ NActors::TActorId HttpProxy;
+};
+
+std::tuple<std::unique_ptr<TActorSystemManager>, TActorIds> RunActorSystem(
+ const TGatewaysConfig& gatewaysConfig,
+ IMetricsRegistryPtr& metricsRegistry,
+ NYql::NLog::ELevel loggingLevel
+) {
+ auto actorSystemManager = std::make_unique<TActorSystemManager>(metricsRegistry, YqlToActorsLogLevel(loggingLevel));
+ TActorIds actorIds;
+
+ // Run actor system only if necessary
+ auto needActorSystem = gatewaysConfig.HasGeneric();
+ if (!needActorSystem) {
+ return std::make_tuple(std::move(actorSystemManager), std::move(actorIds));
+ }
+
+ // One can modify actor system setup via actorSystemManager->ApplySetupModifier().
+ // TODO: https://st.yandex-team.ru/YQL-16131
+ // This will be useful for DQ Gateway initialization refactoring.
+ actorSystemManager->Start();
+
+ // Actor system is initialized; start actor registration.
+ if (gatewaysConfig.HasGeneric()) {
+ auto httpProxy = NHttp::CreateHttpProxy();
+ actorIds.HttpProxy = actorSystemManager->GetActorSystem()->Register(httpProxy);
+
+ auto databaseResolver = NFq::CreateDatabaseResolver(actorIds.HttpProxy, nullptr);
+ actorIds.DatabaseResolver = actorSystemManager->GetActorSystem()->Register(databaseResolver);
+ }
+
+ return std::make_tuple(std::move(actorSystemManager), std::move(actorIds));
+}
+
+int RunProgram(TProgramPtr program, const TRunOptions& options, const THashMap<TString, TString>& clusters) {
+ bool fail = true;
+ if (options.Sql) {
+ Cout << "Parse SQL..." << Endl;
+ NSQLTranslation::TTranslationSettings sqlSettings;
+ sqlSettings.ClusterMapping = clusters;
+ sqlSettings.SyntaxVersion = 1;
+ sqlSettings.AnsiLexer = options.AnsiLexer;
+ sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
+ sqlSettings.Flags.insert("DqEngineEnable");
+ if (!options.AnalyzeQuery) {
+ sqlSettings.Flags.insert("DqEngineForce");
+ }
+
+ if (options.BindingsFile) {
+ TFileInput input(*options.BindingsFile);
+ LoadBindings(sqlSettings.Bindings, input.ReadAll());
+ }
+
+ fail = !program->ParseSql(sqlSettings);
+ } else {
+ Cout << "Parse YQL..." << Endl;
+ fail = !program->ParseYql();
+ }
+ program->PrintErrorsTo(*options.ErrStream);
+ if (fail) {
+ return 1;
+ }
+ program->SetAbortHidden([](){
+ Cout << "hidden pseudo-aborted" << Endl;
+ });
+ Cout << "Compile program..." << Endl;
+ fail = !program->Compile(options.User);
+ program->PrintErrorsTo(*options.ErrStream);
+ if (options.TraceOpt) {
+ program->Print(&Cerr, nullptr);
+ }
+ if (fail) {
+ return 1;
+ }
+
+ TProgram::TStatus status = TProgram::TStatus::Error;
+ if (options.OptimizeOnly) {
+ Cout << "Optimize program..." << Endl;
+ auto config = TOptPipelineConfigurator(program, options.PrintPlan, options.TracePlan);
+ status = program->OptimizeWithConfig(options.User, config);
+ } else {
+ Cout << "Run program..." << Endl;
+ auto config = TOptPipelineConfigurator(program, options.PrintPlan, options.TracePlan);
+ status = program->RunWithConfig(options.User, config);
+ }
+ program->PrintErrorsTo(*options.ErrStream);
+ if (status == TProgram::TStatus::Error) {
+ if (options.TraceOpt) {
+ program->Print(&Cerr, nullptr);
+ }
+ return 1;
+ }
+ program->Print(options.ExprOut, options.TracePlan);
+
+ Cout << "Getting results..." << Endl;
+ if (program->HasResults()) {
+ NYson::TYsonWriter yson(options.ResultOut, options.ResultsFormat);
+ yson.OnBeginList();
+ for (const auto& result: program->Results()) {
+ yson.OnListItem();
+ yson.OnRaw(result);
+ }
+ yson.OnEndList();
+ }
+
+ if (options.StatisticsStream) {
+ if (auto st = program->GetStatistics(true)) {
+ TStringInput in(*st);
+ NYson::ReformatYsonStream(&in, options.StatisticsStream, NYson::EYsonFormat::Pretty);
+ }
+ }
+
+ Cout << Endl << "Done" << Endl;
+ return 0;
+}
+
+int RunMain(int argc, const char* argv[])
+{
+ TString gatewaysCfgFile;
+ TString progFile;
+ TVector<TString> tablesMappingList;
+ THashMap<TString, TString> tablesMapping;
+ TString user = GetUsername();
+ TString format;
+ TVector<TString> filesMappingList;
+ TVector<TString> urlMappingList;
+ TString exprFile;
+ TString resultFile;
+ TString planFile;
+ TString errFile;
+ TString paramsFile;
+ TString fileStorageCfg;
+ TVector<TString> udfsPaths;
+ TString udfsDir;
+ TMaybe<TString> dqHost;
+ TMaybe<int> dqPort;
+ int threads = 16;
+ TString tmpDir;
+ const bool hasValidate = false; // todo
+ THashSet<TString> gatewayTypes; // yqlrun compat, unused
+ ui16 syntaxVersion; // yqlrun compat, unused
+ bool emulateOutputForMultirun = false;
+ THashMap<TString, TString> clusterMapping;
+ THashSet<TString> sqlFlags;
+ IMetricsRegistryPtr metricsRegistry = CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq));
+ clusterMapping["plato"] = YtProviderName;
+
+ TString mountConfig;
+ TString mestricsPusherConfig;
+ TString udfResolver;
+ bool udfResolverFilterSyscalls = false;
+ TString statFile;
+ TString metricsFile;
+ int verbosity = 3;
+ bool showLog = false;
+ bool emulateYt = false;
+ TString token = GetEnv("YQL_TOKEN");
+ if (!token) {
+ TString home = GetEnv("HOME");
+ auto tokenPath = TFsPath(home) / ".yql" / "token";
+ if (tokenPath.Exists()) {
+ token = StripStringRight(TFileInput(tokenPath).ReadAll());
+ }
+ }
+ THashMap<TString, TString> customTokens;
+ TString folderId;
+
+ TRunOptions runOptions;
+
+ NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
+ opts.AddLongOption('p', "program", "Program to execute (use '-' to read from stdin)")
+ .Required()
+ .RequiredArgument("FILE")
+ .StoreResult(&progFile);
+ opts.AddLongOption('s', "sql", "Program is SQL query")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.Sql);
+ opts.AddLongOption('t', "table", "table@file").AppendTo(&tablesMappingList);
+ opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping));
+ opts.AddLongOption('u', "user", "MR user")
+ .Optional()
+ .RequiredArgument("USER")
+ .StoreResult(&user);
+ opts.AddLongOption("format", "results format, one of { binary | text | pretty }")
+ .Optional()
+ .RequiredArgument("STR")
+ .DefaultValue("text")
+ .StoreResult(&format);
+ opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList);
+ opts.AddLongOption("url", "name@url").AppendTo(&urlMappingList);
+ opts.AddLongOption("gateways-cfg", "gateways configuration file")
+ .Optional()
+ .RequiredArgument("FILE")
+ .StoreResult(&gatewaysCfgFile);
+ opts.AddLongOption("fs-cfg", "Path to file storage config")
+ .Optional()
+ .StoreResult(&fileStorageCfg);
+ opts.AddLongOption("udf", "Load shared library with UDF by given path")
+ .AppendTo(&udfsPaths);
+ opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found"
+ " in given directory")
+ .StoreResult(&udfsDir);
+ opts.AddLongOption('O', "optimize", "optimize expression")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.OptimizeOnly);
+ opts.AddLongOption("peephole", "perform peephole optimization")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.PeepholeOnly);
+ opts.AddLongOption("trace-opt", "print AST in the begin of each transformation")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.TraceOpt);
+ opts.AddLongOption("print-expr", "print rebuild AST before execution").NoArgument();
+ opts.AddLongOption("expr-file", "print AST to that file instead of stdout").StoreResult<TString>(&exprFile);
+ opts.AddLongOption("result-file", "print program execution result to file").StoreResult<TString>(&resultFile);
+ opts.AddLongOption("plan-file", "print program plan to file").StoreResult<TString>(&planFile);
+ opts.AddLongOption("err-file", "print validate/optimize/runtime errors to file").StoreResult<TString>(&errFile);
+ opts.AddLongOption("params-file", "Query parameters values in YSON format").StoreResult(&paramsFile);
+ opts.AddLongOption("tmp-dir", "directory for temporary tables").StoreResult<TString>(&tmpDir);
+ opts.AddLongOption('G', "gateways", "used gateways").SplitHandler(&gatewayTypes, ',').DefaultValue(DqProviderName);
+ opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&sqlFlags, ',');
+ opts.AddLongOption("syntax-version", "SQL syntax version").StoreResult(&syntaxVersion).DefaultValue(1);
+ opts.AddLongOption('m', "mounts", "Mount points config file.").StoreResult(&mountConfig);
+ opts.AddLongOption('R',"run", "run expression using input/output tables").NoArgument(); // yqlrun compat
+ opts.AddLongOption('L', "show-log", "show transformation log")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&showLog);
+ opts.AddLongOption("udf-resolver", "Path to udf-resolver")
+ .Optional()
+ .RequiredArgument("PATH")
+ .StoreResult(&udfResolver);
+ opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&udfResolverFilterSyscalls);
+ opts.AddLongOption('v', "verbosity", "Log verbosity level")
+ .Optional()
+ .RequiredArgument("LEVEL")
+ .DefaultValue("6")
+ .StoreResult(&verbosity);
+ opts.AddLongOption("token", "YQL token")
+ .Optional()
+ .RequiredArgument("VALUE")
+ .StoreResult(&token);
+ opts.AddLongOption("custom-tokens", "Custom tokens")
+ .Optional()
+ .RequiredArgument("NAME=VALUE or NAME=@PATH")
+ .KVHandler([&customTokens](TString key, TString value) {
+ if (value.StartsWith('@')) {
+ customTokens[key] = StripStringRight(TFileInput(value.substr(1)).ReadAll());
+ } else {
+ customTokens[key] = value;
+ }
+ });
+ opts.AddLongOption("folderId", "Yandex Cloud folder ID (resolve objects inside this folder)")
+ .Optional()
+ .RequiredArgument("VALUE")
+ .StoreResult(&folderId);
+ opts.AddLongOption("stat", "Print execution statistics")
+ .Optional()
+ .OptionalArgument("FILE")
+ .StoreResult(&statFile);
+ opts.AddLongOption("metrics", "Print execution metrics")
+ .Optional()
+ .OptionalArgument("FILE")
+ .StoreResult(&metricsFile);
+ opts.AddLongOption("print-plan", "Print basic and detailed plan")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.PrintPlan);
+ opts.AddLongOption("keep-temp", "keep temporary tables").NoArgument();
+ opts.AddLongOption("analyze-query", "enable analyze query").Optional().NoArgument().SetFlag(&runOptions.AnalyzeQuery);
+ opts.AddLongOption("ansi-lexer", "Use ansi lexer").Optional().NoArgument().SetFlag(&runOptions.AnsiLexer);
+ opts.AddLongOption('E', "emulate-yt", "Emulate YT tables").Optional().NoArgument().SetFlag(&emulateYt);
+
+ opts.AddLongOption("dq-host", "Dq Host");
+ opts.AddLongOption("dq-port", "Dq Port");
+ opts.AddLongOption("threads", "Threads");
+ opts.AddLongOption("bindings-file", "Bindings File")
+ .StoreResult(&runOptions.BindingsFile);
+ opts.AddLongOption("metrics-pusher-config", "Metrics Pusher Config")
+ .StoreResult(&mestricsPusherConfig);
+ opts.AddHelpOption('h');
+
+ opts.SetFreeArgsNum(0);
+
+ NLastGetopt::TOptsParseResult res(&opts, argc, argv);
+
+ if (runOptions.PeepholeOnly) {
+ Cerr << "Peephole optimization is not supported yet" << Endl;
+ return 1;
+ }
+
+ if (res.Has("dq-host")) {
+ dqHost = res.Get<TString>("dq-host");
+ }
+ if (res.Has("dq-port")) {
+ dqPort = res.Get<int>("dq-port");
+ }
+ if (res.Has("threads")) {
+ threads = res.Get<int>("threads");
+ }
+
+ THolder<TFixedBufferFileOutput> exprFileHolder;
+ if (res.Has("print-expr")) {
+ runOptions.ExprOut = &Cout;
+ } else if (!exprFile.empty()) {
+ exprFileHolder.Reset(new TFixedBufferFileOutput(exprFile));
+ runOptions.ExprOut = exprFileHolder.Get();
+ }
+ THolder<TFixedBufferFileOutput> errFileHolder;
+ if (!errFile.empty()) {
+ errFileHolder.Reset(new TFixedBufferFileOutput(errFile));
+ runOptions.ErrStream = errFileHolder.Get();
+ }
+ THolder<TFixedBufferFileOutput> resultFileHolder;
+ if (!resultFile.empty()) {
+ resultFileHolder.Reset(new TFixedBufferFileOutput(resultFile));
+ runOptions.ResultOut = resultFileHolder.Get();
+ }
+ THolder<TFixedBufferFileOutput> planFileHolder;
+ if (!planFile.empty()) {
+ planFileHolder.Reset(new TFixedBufferFileOutput(planFile));
+ runOptions.TracePlan = planFileHolder.Get();
+ }
+
+ for (auto& s: tablesMappingList) {
+ TStringBuf tableName, filePath;
+ TStringBuf(s).Split('@', tableName, filePath);
+ if (tableName.empty() || filePath.empty()) {
+ Cerr << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt" << Endl;
+ return 1;
+ }
+ tablesMapping[tableName] = filePath;
+ }
+
+ // Reinit logger with new level
+ NYql::NLog::ELevel loggingLevel = NYql::NLog::ELevelHelpers::FromInt(verbosity);
+ if (verbosity != LOG_DEF_PRIORITY) {
+ NYql::NLog::EComponentHelpers::ForEach([loggingLevel](NYql::NLog::EComponent c) {
+ NYql::NLog::YqlLogger().SetComponentLevel(c, loggingLevel);
+ });
+ }
+
+ if (runOptions.TraceOpt) {
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE);
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CoreEval, NYql::NLog::ELevel::TRACE);
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CorePeepHole, NYql::NLog::ELevel::TRACE);
+ } else if (showLog) {
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::DEBUG);
+ }
+
+ YQL_LOG(INFO) << "dqrun ABI version: " << NUdf::CurrentAbiVersionStr();
+
+ if (emulateYt && dqPort) {
+ YQL_LOG(ERROR) << "Remote DQ instance cannot work with the emulated YT cluster";
+ return 1;
+ }
+
+ runOptions.ResultsFormat =
+ (format == TStringBuf("binary")) ? NYson::EYsonFormat::Binary
+ : (format == TStringBuf("text")) ? NYson::EYsonFormat::Text
+ : NYson::EYsonFormat::Pretty;
+
+ runOptions.User = user;
+
+ TUserDataTable dataTable;
+ FillUsedFiles(filesMappingList, dataTable);
+ FillUsedUrls(urlMappingList, dataTable);
+
+ NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths);
+ auto funcRegistry = NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone();
+ NKikimr::NMiniKQL::FillStaticModules(*funcRegistry);
+
+ TGatewaysConfig gatewaysConfig;
+ ReadGatewaysConfig(gatewaysCfgFile, &gatewaysConfig);
+ if (runOptions.AnalyzeQuery) {
+ auto* setting = gatewaysConfig.MutableDq()->AddDefaultSettings();
+ setting->SetName("AnalyzeQuery");
+ setting->SetValue("1");
+ }
+
+ TString defYtServer = gatewaysConfig.HasYt() ? NYql::TConfigClusters::GetDefaultYtServer(gatewaysConfig.GetYt()) : TString();
+ auto storage = CreateFS(fileStorageCfg, defYtServer);
+
+ TVector<TIntrusivePtr<TThrRefBase>> gateways;
+ THashMap<TString, TString> clusters;
+ TVector<TDataProviderInitializer> dataProvidersInit;
+
+ const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"));
+ NYdb::TDriver driver(driverConfig);
+
+ Y_DEFER {
+ driver.Stop(true);
+ };
+
+ TVector<NKikimr::NMiniKQL::TComputationNodeFactory> factories = {
+ GetDqYtFactory(),
+ GetDqYdbFactory(driver),
+ GetCommonDqFactory(),
+ NMiniKQL::GetYqlFactory(),
+ GetPgFactory()
+ };
+
+ if (emulateYt) {
+ auto ytFileServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, storage, tmpDir, res.Has("keep-temp"));
+ for (auto& cluster: gatewaysConfig.GetYt().GetClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), TString{YtProviderName});
+ }
+ factories.push_back(GetYtFileFactory(ytFileServices));
+ clusters["plato"] = YtProviderName;
+ auto ytNativeGateway = CreateYtFileGateway(ytFileServices, &emulateOutputForMultirun);
+ dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
+ } else if (gatewaysConfig.HasYt()) {
+ TYtNativeServices ytServices;
+ ytServices.FunctionRegistry = funcRegistry.Get();
+ ytServices.FileStorage = storage;
+ ytServices.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt());
+ auto ytNativeGateway = CreateYtNativeGateway(ytServices);
+ gateways.emplace_back(ytNativeGateway);
+
+ for (auto& cluster: gatewaysConfig.GetYt().GetClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), TString{YtProviderName});
+ }
+ dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
+ }
+
+ auto dqCompFactory = NMiniKQL::GetCompositeWithBuiltinFactory(factories);
+
+ // Actor system starts here and will be automatically destroyed when goes out of the scope.
+ std::unique_ptr<TActorSystemManager> actorSystemManager;
+ TActorIds actorIds;
+ std::tie(actorSystemManager, actorIds) = RunActorSystem(gatewaysConfig, metricsRegistry, loggingLevel);
+
+ IHTTPGateway::TPtr httpGateway;
+ if (gatewaysConfig.HasClickHouse()) {
+ for (auto& cluster: gatewaysConfig.GetClickHouse().GetClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), TString{ClickHouseProviderName});
+ }
+ if (!httpGateway) {
+ httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr);
+ }
+ dataProvidersInit.push_back(GetClickHouseDataProviderInitializer(httpGateway));
+ }
+
+ NConnector::IClient::TPtr genericClient;
+ if (gatewaysConfig.HasGeneric()) {
+ for (auto& cluster : *gatewaysConfig.MutableGeneric()->MutableClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), TString{GenericProviderName});
+ }
+
+ genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector());
+ auto dbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
+ actorSystemManager->GetActorSystem(),
+ actorIds.DatabaseResolver,
+ "",
+ gatewaysConfig.GetGeneric().GetMdbGateway(),
+ NFq::MakeMdbEndpointGeneratorGeneric(false)
+ );
+ dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver));
+ }
+
+ if (gatewaysConfig.HasYdb()) {
+ for (auto& cluster: gatewaysConfig.GetYdb().GetClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), TString{YdbProviderName});
+ }
+ dataProvidersInit.push_back(GetYdbDataProviderInitializer(driver));
+ }
+
+ if (gatewaysConfig.HasS3()) {
+ for (auto& cluster: gatewaysConfig.GetS3().GetClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), TString{S3ProviderName});
+ }
+ if (!httpGateway) {
+ httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr);
+ }
+ dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true));
+ }
+
+ if (gatewaysConfig.HasPq()) {
+ TPqGatewayServices pqServices(
+ driver,
+ nullptr,
+ nullptr, // credentials factory
+ std::make_shared<TPqGatewayConfig>(gatewaysConfig.GetPq()),
+ funcRegistry.Get()
+ );
+ auto pqGateway = CreatePqNativeGateway(pqServices);
+ gateways.emplace_back(pqGateway);
+ for (auto& cluster: gatewaysConfig.GetPq().GetClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), TString{PqProviderName});
+ }
+ dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway));
+ }
+
+ if (gatewaysConfig.HasSolomon()) {
+ auto solomonConfig = gatewaysConfig.GetSolomon();
+ auto solomonGateway = CreateSolomonGateway(solomonConfig);
+
+ gateways.emplace_back(solomonGateway);
+ dataProvidersInit.push_back(NYql::GetSolomonDataProviderInitializer(solomonGateway, false));
+ for (const auto& cluster: gatewaysConfig.GetSolomon().GetClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), TString{NYql::SolomonProviderName});
+ }
+ }
+
+ std::function<NActors::IActor*(void)> metricsPusherFactory = {};
+
+ {
+ TIntrusivePtr<IDqGateway> dqGateway;
+ if (dqPort) {
+ dqGateway = CreateDqGateway(dqHost.GetOrElse("localhost"), *dqPort);
+ } else {
+ auto dqTaskTransformFactory = CreateCompositeTaskTransformFactory({
+ CreateCommonDqTaskTransformFactory(),
+ CreateYtDqTaskTransformFactory(),
+ CreateYdbDqTaskTransformFactory()
+ });
+
+ TDqTaskPreprocessorFactoryCollection dqTaskPreprocessorFactories = {
+ NDq::CreateYtDqTaskPreprocessorFactory(emulateYt, funcRegistry)
+ };
+
+ size_t requestTimeout = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasRequestTimeoutSeconds() ? gatewaysConfig.GetHttpGateway().GetRequestTimeoutSeconds() : 100;
+ size_t maxRetries = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasMaxRetries() ? gatewaysConfig.GetHttpGateway().GetMaxRetries() : 2;
+
+ dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories,
+ CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads,
+ metricsRegistry,
+ metricsPusherFactory);
+ }
+
+ gateways.emplace_back(dqGateway);
+ dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));
+ }
+
+ TExprContext ctx;
+ IModuleResolver::TPtr moduleResolver;
+ if (!mountConfig.empty()) {
+ TModulesTable modules;
+ NYqlMountConfig::TMountConfig mount;
+ Y_VERIFY(NKikimr::ParsePBFromFile(mountConfig, &mount));
+ FillUserDataTableFromFileSystem(mount, dataTable);
+
+ if (!CompileLibraries(dataTable, ctx, modules)) {
+ *runOptions.ErrStream << "Errors on compile libraries:" << Endl;
+ ctx.IssueManager.GetIssues().PrintTo(*runOptions.ErrStream);
+ return -1;
+ }
+
+ moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, clusterMapping, sqlFlags, hasValidate);
+ } else {
+ if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusters)) {
+ *runOptions.ErrStream << "Errors loading default YQL libraries:" << Endl;
+ ctx.IssueManager.GetIssues().PrintTo(*runOptions.ErrStream);
+ return 1;
+ }
+ }
+
+ TExprContext::TFreezeGuard freezeGuard(ctx);
+
+ TProgramFactory progFactory(emulateYt, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "dqrun");
+ progFactory.AddUserDataTable(std::move(dataTable));
+ progFactory.SetModules(moduleResolver);
+ if (udfResolver) {
+ progFactory.SetUdfResolver(NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage,
+ udfResolver, {}, {}, udfResolverFilterSyscalls, {}));
+ } else {
+ progFactory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage, true));
+ }
+ progFactory.SetFileStorage(storage);
+ progFactory.SetUrlPreprocessing(new TUrlPreprocessing(gatewaysConfig));
+ progFactory.SetGatewaysConfig(&gatewaysConfig);
+ TCredentials::TPtr creds = MakeIntrusive<TCredentials>();
+ if (token) {
+ if (!emulateYt) {
+ creds->AddCredential("default_yt", TCredential("yt", "", token));
+ }
+ creds->AddCredential("default_ydb", TCredential("ydb", "", token));
+ creds->AddCredential("default_pq", TCredential("pq", "", token));
+ creds->AddCredential("default_s3", TCredential("s3", "", token));
+ creds->AddCredential("default_solomon", TCredential("solomon", "", token));
+ creds->AddCredential("default_generic", TCredential("generic", "", token));
+ }
+ if (!customTokens.empty()) {
+ for (auto& [key, value]: customTokens) {
+ creds->AddCredential(key, TCredential("custom", "", value));
+ }
+ }
+ progFactory.SetCredentials(creds);
+
+ progFactory.SetUrlListerManager(
+ MakeUrlListerManager(
+ {MakeYtUrlLister()}
+ )
+ );
+
+ TProgramPtr program;
+ if (progFile == TStringBuf("-")) {
+ program = progFactory.Create("-stdin-", Cin.ReadAll());
+ } else {
+ program = progFactory.Create(TFile(progFile, RdOnly));
+ program->SetQueryName(progFile);
+ }
+ if (paramsFile) {
+ TString parameters = TFileInput(paramsFile).ReadAll();
+ program->SetParametersYson(parameters);
+ }
+
+ if (!emulateYt) {
+ program->EnableResultPosition();
+ }
+
+ THolder<IOutputStream> statStreamHolder;
+ if (res.Has("stat")) {
+ if (statFile) {
+ statStreamHolder = MakeHolder<TFileOutput>(statFile);
+ runOptions.StatisticsStream = statStreamHolder.Get();
+ } else {
+ runOptions.StatisticsStream = &Cerr;
+ }
+ }
+
+ int result = RunProgram(std::move(program), runOptions, clusters);
+ if (res.Has("metrics")) {
+ NProto::TMetricsRegistrySnapshot snapshot;
+ snapshot.SetDontIncrement(true);
+ metricsRegistry->TakeSnapshot(&snapshot);
+ auto output = MakeHolder<TFileOutput>(metricsFile);
+ SerializeToTextFormat(snapshot, *output.Get());
+ }
+
+ return result;
+}
+
+int main(int argc, const char* argv[])
+{
+ Y_UNUSED(NUdf::GetStaticSymbols());
+ NYql::NBacktrace::RegisterKikimrFatalActions();
+ NYql::NBacktrace::EnableKikimrSymbolize();
+
+ NYT::Initialize(argc, argv);
+
+ // Instead of hardcoding logging level, use CLI args:
+ // ./dqrun ... -v 6 <- INFO
+ // ./dqrun ... -v 7 <- DEBUG
+ // ./dqrun ... -v 8 <- TRACE
+ auto loggerConfig = NYql::NProto::TLoggingConfig();
+ NYql::NLog::InitLogger(loggerConfig, false);
+
+ auto oldBackend = NYql::NLog::YqlLogger().ReleaseBackend();
+ NYql::NLog::YqlLogger().ResetBackend(THolder(new NYql::NLog::TTlsLogBackend(oldBackend)));
+
+ //NYql::NLog::YqlLoggerScope logger(&Cerr);
+
+ try {
+#ifdef PROFILE_MEMORY_ALLOCATIONS
+ NAllocProfiler::StartAllocationSampling(true);
+#endif
+ const int res = RunMain(argc, argv);
+#ifdef PROFILE_MEMORY_ALLOCATIONS
+ NAllocProfiler::StopAllocationSampling(Cout);
+#endif
+ return res;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ return 1;
+ }
+}
diff --git a/ydb/library/yql/tools/dqrun/ya.make b/ydb/library/yql/tools/dqrun/ya.make
new file mode 100644
index 00000000000..c904d4617a9
--- /dev/null
+++ b/ydb/library/yql/tools/dqrun/ya.make
@@ -0,0 +1,90 @@
+IF (NOT OS_WINDOWS)
+ PROGRAM()
+
+IF (PROFILE_MEMORY_ALLOCATIONS)
+ ALLOCATOR(LF_DBG)
+ CFLAGS(-DPROFILE_MEMORY_ALLOCATIONS)
+ELSE()
+ ALLOCATOR(J)
+ENDIF()
+
+ SRCS(
+ dqrun.cpp
+ )
+
+ PEERDIR(
+ contrib/libs/protobuf
+ ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs
+ library/cpp/actors/http
+ library/cpp/getopt
+ library/cpp/lfalloc/alloc_profiler
+ library/cpp/logger
+ library/cpp/resource
+ library/cpp/yson
+ yt/cpp/mapreduce/interface
+ ydb/library/yql/sql/pg
+ ydb/library/yql/core/facade
+ ydb/library/yql/core/file_storage
+ ydb/library/yql/core/file_storage/proto
+ ydb/library/yql/core/file_storage/http_download
+ ydb/library/yql/core/services
+ ydb/library/yql/core/services/mounts
+ ydb/library/yql/dq/comp_nodes
+ ydb/library/yql/dq/integration/transform
+ ydb/library/yql/dq/transform
+ ydb/library/yql/minikql/comp_nodes/llvm
+ ydb/library/yql/minikql/invoke_builtins/llvm
+ ydb/library/yql/providers/clickhouse/actors
+ ydb/library/yql/providers/clickhouse/provider
+ ydb/library/yql/providers/common/comp_nodes
+ ydb/library/yql/providers/common/proto
+ ydb/library/yql/providers/common/udf_resolve
+ ydb/library/yql/providers/generic/actors
+ ydb/library/yql/providers/generic/provider
+ ydb/library/yql/providers/dq/local_gateway
+ ydb/library/yql/providers/dq/provider
+ ydb/library/yql/providers/dq/provider/exec
+ ydb/library/yql/providers/pq/async_io
+ ydb/library/yql/providers/pq/gateway/native
+ ydb/library/yql/providers/pq/provider
+ ydb/library/yql/providers/s3/actors
+ ydb/library/yql/providers/s3/provider
+ ydb/library/yql/providers/solomon/gateway
+ ydb/library/yql/providers/solomon/provider
+ ydb/library/yql/providers/ydb/actors
+ ydb/library/yql/providers/ydb/comp_nodes
+ ydb/library/yql/providers/ydb/provider
+
+ ydb/library/yql/public/udf/service/terminate_policy
+ ydb/library/yql/utils/backtrace
+ ydb/library/yql/utils/bindings
+ ydb/library/yql/utils/log
+ ydb/library/yql/core/url_preprocessing
+ ydb/library/yql/core/url_lister
+ ydb/library/yql/providers/yt/comp_nodes/dq
+ ydb/library/yql/providers/yt/dq_task_preprocessor
+ ydb/library/yql/providers/yt/gateway/file
+ ydb/library/yql/providers/yt/gateway/native
+ ydb/library/yql/providers/yt/mkql_dq
+ ydb/library/yql/providers/yt/provider
+ ydb/library/yql/providers/yt/lib/yt_download
+ ydb/library/yql/providers/yt/lib/yt_url_lister
+ ydb/library/yql/providers/yt/lib/config_clusters
+ ydb/library/yql/parser/pg_wrapper
+ ydb/library/yql/utils/log/proto
+
+ ydb/library/yql/utils/actor_system
+ ydb/core/fq/libs/actors
+ ydb/core/fq/libs/db_id_async_resolver_impl
+
+ ydb/library/yql/udfs/common/clickhouse/client
+ )
+
+ YQL_LAST_ABI_VERSION()
+
+ END()
+ELSE()
+ LIBRARY()
+
+ END()
+ENDIF()
diff --git a/ydb/library/yql/tools/ya.make b/ydb/library/yql/tools/ya.make
index a17f84573b4..2841fa69adc 100644
--- a/ydb/library/yql/tools/ya.make
+++ b/ydb/library/yql/tools/ya.make
@@ -1,6 +1,7 @@
RECURSE(
astdiff
+ dqrun
mrjob
- yqlrun
sql2yql
+ yqlrun
)