aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-10-25 00:20:50 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-10-25 00:38:23 +0300
commit67ad455d45c5960b956357712a30e3b03d443342 (patch)
tree513ba5eab81bd77e257337ef7461763408aebeb7
parent69b890018688b9b26e048d80009e94af5f67803d (diff)
downloadydb-67ad455d45c5960b956357712a30e3b03d443342.tar.gz
Introduce OSS version of control utility for service_node/working_node (dq testing utilities)
-rw-r--r--.mapping.json10
-rw-r--r--ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt15
-rw-r--r--ydb/library/yql/tools/CMakeLists.linux-aarch64.txt15
-rw-r--r--ydb/library/yql/tools/CMakeLists.linux-x86_64.txt15
-rw-r--r--ydb/library/yql/tools/CMakeLists.txt16
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.txt2
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.windows-x86_64.txt (renamed from ydb/library/yql/tools/CMakeLists.windows-x86_64.txt)7
-rw-r--r--ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt38
-rw-r--r--ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt41
-rw-r--r--ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt43
-rw-r--r--ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt17
-rw-r--r--ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt31
-rw-r--r--ydb/library/yql/tools/dq/dq_cli/main.cpp495
-rw-r--r--ydb/library/yql/tools/dq/dq_cli/ya.make19
-rw-r--r--ydb/library/yql/tools/dq/ya.make1
18 files changed, 704 insertions, 64 deletions
diff --git a/.mapping.json b/.mapping.json
index 94d161bbc7..31e35d3301 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -8366,11 +8366,7 @@
"ydb/library/yql/sql/v1/ut/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/sql/v1/ut/CMakeLists.txt":"",
"ydb/library/yql/sql/v1/ut/CMakeLists.windows-x86_64.txt":"",
- "ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt":"",
- "ydb/library/yql/tools/CMakeLists.linux-aarch64.txt":"",
- "ydb/library/yql/tools/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/tools/CMakeLists.txt":"",
- "ydb/library/yql/tools/CMakeLists.windows-x86_64.txt":"",
"ydb/library/yql/tools/astdiff/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/tools/astdiff/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/tools/astdiff/CMakeLists.linux-x86_64.txt":"",
@@ -8380,6 +8376,12 @@
"ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/tools/dq/CMakeLists.txt":"",
+ "ydb/library/yql/tools/dq/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt":"",
+ "ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt":"",
"ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index 61ab79236b..0000000000
--- a/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(astdiff)
-add_subdirectory(dq)
-add_subdirectory(dqrun)
-add_subdirectory(mrjob)
-add_subdirectory(sql2yql)
-add_subdirectory(sql_formatter)
-add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index 61ab79236b..0000000000
--- a/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(astdiff)
-add_subdirectory(dq)
-add_subdirectory(dqrun)
-add_subdirectory(mrjob)
-add_subdirectory(sql2yql)
-add_subdirectory(sql_formatter)
-add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index 61ab79236b..0000000000
--- a/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(astdiff)
-add_subdirectory(dq)
-add_subdirectory(dqrun)
-add_subdirectory(mrjob)
-add_subdirectory(sql2yql)
-add_subdirectory(sql_formatter)
-add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/CMakeLists.txt b/ydb/library/yql/tools/CMakeLists.txt
index f8b31df0c1..61ab79236b 100644
--- a/ydb/library/yql/tools/CMakeLists.txt
+++ b/ydb/library/yql/tools/CMakeLists.txt
@@ -6,12 +6,10 @@
# original buildsystem will not be accepted.
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
+add_subdirectory(astdiff)
+add_subdirectory(dq)
+add_subdirectory(dqrun)
+add_subdirectory(mrjob)
+add_subdirectory(sql2yql)
+add_subdirectory(sql_formatter)
+add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt
index bb8870384d..d21d3f9f9e 100644
--- a/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt
@@ -6,5 +6,6 @@
# original buildsystem will not be accepted.
+add_subdirectory(dq_cli)
add_subdirectory(service_node)
add_subdirectory(worker_node)
diff --git a/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt
index bb8870384d..d21d3f9f9e 100644
--- a/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt
@@ -6,5 +6,6 @@
# original buildsystem will not be accepted.
+add_subdirectory(dq_cli)
add_subdirectory(service_node)
add_subdirectory(worker_node)
diff --git a/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt
index bb8870384d..d21d3f9f9e 100644
--- a/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt
@@ -6,5 +6,6 @@
# original buildsystem will not be accepted.
+add_subdirectory(dq_cli)
add_subdirectory(service_node)
add_subdirectory(worker_node)
diff --git a/ydb/library/yql/tools/dq/CMakeLists.txt b/ydb/library/yql/tools/dq/CMakeLists.txt
index 606ff46b4b..f8b31df0c1 100644
--- a/ydb/library/yql/tools/dq/CMakeLists.txt
+++ b/ydb/library/yql/tools/dq/CMakeLists.txt
@@ -10,6 +10,8 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarc
include(CMakeLists.linux-aarch64.txt)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
include(CMakeLists.linux-x86_64.txt)
endif()
diff --git a/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.windows-x86_64.txt
index bd1f049b04..af9aac9cb0 100644
--- a/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/tools/dq/CMakeLists.windows-x86_64.txt
@@ -6,9 +6,4 @@
# original buildsystem will not be accepted.
-add_subdirectory(astdiff)
-add_subdirectory(dqrun)
-add_subdirectory(mrjob)
-add_subdirectory(sql2yql)
-add_subdirectory(sql_formatter)
-add_subdirectory(yqlrun)
+add_subdirectory(dq_cli)
diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..aaa1da1d54
--- /dev/null
+++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,38 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(dq_cli)
+target_link_libraries(dq_cli PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ library-cpp-getopt
+ cpp-grpc-client
+ cpp-protobuf-util
+ cpp-threading-future
+ library-yql-utils
+ api-protos
+ public-lib-yson_value
+ dq-api-grpc
+ providers-dq-common
+)
+target_link_options(dq_cli PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(dq_cli PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/dq_cli/main.cpp
+)
+target_allocator(dq_cli
+ system_allocator
+)
+vcs_info(dq_cli)
diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..0caf90835f
--- /dev/null
+++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,41 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(dq_cli)
+target_link_libraries(dq_cli PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-getopt
+ cpp-grpc-client
+ cpp-protobuf-util
+ cpp-threading-future
+ library-yql-utils
+ api-protos
+ public-lib-yson_value
+ dq-api-grpc
+ providers-dq-common
+)
+target_link_options(dq_cli PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(dq_cli PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/dq_cli/main.cpp
+)
+target_allocator(dq_cli
+ cpp-malloc-jemalloc
+)
+vcs_info(dq_cli)
diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..140fa5efe3
--- /dev/null
+++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,43 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(dq_cli)
+target_link_libraries(dq_cli PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ library-cpp-getopt
+ cpp-grpc-client
+ cpp-protobuf-util
+ cpp-threading-future
+ library-yql-utils
+ api-protos
+ public-lib-yson_value
+ dq-api-grpc
+ providers-dq-common
+)
+target_link_options(dq_cli PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(dq_cli PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/dq_cli/main.cpp
+)
+target_allocator(dq_cli
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(dq_cli)
diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..18f6217842
--- /dev/null
+++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,31 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(dq_cli)
+target_link_libraries(dq_cli PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ library-cpp-getopt
+ cpp-grpc-client
+ cpp-protobuf-util
+ cpp-threading-future
+ library-yql-utils
+ api-protos
+ public-lib-yson_value
+ dq-api-grpc
+ providers-dq-common
+)
+target_sources(dq_cli PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/dq_cli/main.cpp
+)
+target_allocator(dq_cli
+ system_allocator
+)
+vcs_info(dq_cli)
diff --git a/ydb/library/yql/tools/dq/dq_cli/main.cpp b/ydb/library/yql/tools/dq/dq_cli/main.cpp
new file mode 100644
index 0000000000..e7780fbe8b
--- /dev/null
+++ b/ydb/library/yql/tools/dq/dq_cli/main.cpp
@@ -0,0 +1,495 @@
+#include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h>
+#include <ydb/library/yql/providers/dq/actors/yt/nodeid_assigner.h>
+
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <library/cpp/getopt/last_getopt.h>
+#include <library/cpp/grpc/client/grpc_client_low.h>
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/protobuf/util/pb_io.h>
+
+#include <util/string/builder.h>
+#include <util/system/env.h>
+#include <util/system/file.h>
+#include <util/stream/file.h>
+#include <util/generic/guid.h>
+#include <util/string/split.h>
+
+using namespace NGrpc;
+using namespace Yql::DqsProto;
+using namespace NYql;
+
+int SvnRevision(TServiceConnection<DqService>& service, const TVector<TString>& args) {
+ if (args.size() != 1) {
+ Cerr << "Suspicious extra args" << Endl;
+ }
+
+ auto promise = NThreading::NewPromise<int>();
+ auto callback = [&](TGrpcStatus&& status, SvnRevisionResponse&& resp) {
+ if (status.Ok()) {
+ Cout << resp.GetRevision() << Endl;
+ promise.SetValue(0);
+ } else {
+ Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl;
+ promise.SetValue(status.GRpcStatusCode);
+ }
+ };
+
+ service.DoRequest<SvnRevisionRequest, SvnRevisionResponse>(SvnRevisionRequest(), callback,
+ &DqService::Stub::AsyncSvnRevision);
+ return promise.GetFuture().GetValueSync();
+}
+
+ClusterStatusResponse Info(TServiceConnection<DqService>& service) {
+ auto promise = NThreading::NewPromise<ClusterStatusResponse>();
+ auto callback = [&](TGrpcStatus&& status, ClusterStatusResponse&& resp) {
+ if (status.Ok()) {
+ promise.SetValue(resp);
+ } else {
+ Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl;
+ promise.SetException("Error");
+ }
+ };
+
+ service.DoRequest<ClusterStatusRequest, ClusterStatusResponse>(
+ ClusterStatusRequest(),
+ callback,
+ &DqService::Stub::AsyncClusterStatus);
+ return promise.GetFuture().GetValueSync();
+}
+
+void Stop(TServiceConnection<DqService>& service, const JobStopRequest& request)
+{
+ auto promise = NThreading::NewPromise<void>();
+ auto callback = [&](TGrpcStatus&& status, JobStopResponse&& ) {
+ if (status.Ok()) {
+ promise.SetValue();
+ } else {
+ Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl;
+ promise.SetValue();
+ }
+ };
+
+ service.DoRequest<JobStopRequest, JobStopResponse>(
+ request,
+ callback,
+ &DqService::Stub::AsyncJobStop);
+ promise.GetFuture().GetValueSync();
+ return;
+}
+
+void ClusterUpgrade(
+ TServiceConnection<DqService>& service,
+ const TString& revision,
+ const TString& cluster,
+ bool force)
+{
+ Y_UNUSED(force);
+
+ auto status = Info(service);
+ int totalWorkers = 0;
+ int started = 0;
+ for (const auto& worker : status.GetWorker()) {
+ if (worker.GetClusterName() == cluster) {
+ if (worker.GetRevision() == revision) {
+ started ++;
+ } else {
+ totalWorkers ++;
+ }
+ }
+ }
+
+ Cerr << "Total workers of new revision " << revision << " : " << started << Endl;
+ Cerr << "Stopping " << totalWorkers << " workers" << Endl;
+
+ while (true) {
+ int started = 0;
+ int stopping = 0;
+ int pending = 0;
+ int ready = 0;
+ for (const auto& worker : status.GetWorker()) {
+ if (worker.GetDead()) {
+ continue;
+ }
+ if (worker.GetClusterName() == cluster)
+ {
+ if (worker.GetRevision() == revision) {
+ started ++ ;
+ if (worker.GetDownloadList().size() == 0) {
+ ready ++;
+ }
+ } else {
+ pending ++;
+ }
+ if (worker.GetStopping()) {
+ stopping ++;
+ }
+ }
+ }
+
+ Cerr << "Stopping/Pending/Started/Ready/Total "
+ << stopping << "/"
+ << pending << "/"
+ << started << "/"
+ << ready << "/"
+ << totalWorkers << Endl;
+
+ if (pending == 0 && stopping== 0 && ready >= 0.8 * totalWorkers) {
+ break;
+ }
+
+ JobStopRequest request;
+ request.SetRevision(revision);
+ request.SetNegativeRevision(true);
+ request.SetForce(force);
+ request.SetClusterName(cluster);
+
+ Stop(service, request);
+
+ Sleep(TDuration::Seconds(10));
+
+ status = Info(service);
+ }
+ Cerr << "Done" << Endl;
+}
+
+void PerClusterUpgrade(TServiceConnection<DqService>& service, const TVector<TString>& args) {
+ TString revision;
+ bool force = false;
+
+ if (args.size() < 3) {
+ Cerr << "Usage: percluster_upgrade revision hahn,arnold [force]" << Endl;
+ return;
+ }
+
+ revision = args[1];
+
+ if (args.size() > 3) {
+ force = args[3] == "force";
+ }
+
+ for (const auto& it : StringSplitter(args[2]).Split(',')) {
+ Cerr << "Stop " << it.Token() << Endl;
+
+ ClusterUpgrade(service, revision, TString(it.Token()), force);
+ }
+}
+
+void Stop(TServiceConnection<DqService>& service, const TVector<TString>& args, bool force) {
+ if (args.size() != 2) {
+ Cerr << "Suspicious extra args" << Endl;
+ }
+
+ JobStopRequest request;
+ TStringInput inputStream1(args[1]);
+ ParseFromTextFormat(inputStream1, request, EParseFromTextFormatOption::AllowUnknownField);
+ request.SetForce(force);
+ Stop(service, request);
+}
+
+void OperationStop(TServiceConnection<DqService>& service, const TVector<TString>& args)
+{
+ if (args.size() != 2) {
+ Cerr << "Suspicious extra args" << Endl;
+ }
+ TString operationId = args[1];
+
+ auto promise = NThreading::NewPromise<void>();
+ auto callback = [&](TGrpcStatus&& status, OperationStopResponse&& ) {
+ if (status.Ok()) {
+ promise.SetValue();
+ } else {
+ Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl;
+ promise.SetValue();
+ }
+ };
+
+ OperationStopRequest request;
+ request.SetOperationId(operationId);
+
+ service.DoRequest<OperationStopRequest, OperationStopResponse>(
+ request,
+ callback,
+ &DqService::Stub::AsyncOperationStop);
+ promise.GetFuture().GetValueSync();
+ return;
+}
+
+void SmartStop(TServiceConnection<DqService>& service, const TVector<TString>& args, bool negative = false) {
+ if (args.size() != 2) {
+ Cerr << "Suspicious extra args" << Endl; return;
+ }
+ if (args.size() < 2) {
+ Cerr << "Revision requied" << Endl; return;
+ }
+
+ TString revision = args[1];
+ auto status = Info(service);
+ auto totalWorkers = status.GetWorker().size();
+ int maxOperations = -1;
+
+ while (true) {
+ THashSet<TString> operations;
+ status = Info(service);
+ for (const auto& worker : status.GetWorker()) {
+ if (negative == (worker.GetRevision() == revision)) {
+ continue;
+ }
+ if (worker.GetDead()) {
+ continue;
+ }
+ // TString OPERATIONID_ATTR = "yql_operation_id"; // TODO
+ for (const auto& attr : worker.GetAttribute()) {
+ if (attr.GetKey() == NYql::NCommonAttrs::OPERATIONID_ATTR) {
+ operations.emplace(attr.GetValue());
+ }
+ }
+ }
+ if (operations.empty()) {
+ Cerr << "Done" << Endl;
+ break;
+ }
+ maxOperations = Max<int>(maxOperations, operations.size());
+ if (totalWorkers * 0.9 <= status.GetWorker().size()) {
+ auto operationId = *operations.begin();
+ Cerr << "Stopping operation " << operationId << Endl;
+ JobStopRequest request;
+ request.SetRevision(revision);
+ request.SetNegativeRevision(negative);
+ auto* attr = request.AddAttribute();
+ attr->SetKey(NYql::NCommonAttrs::OPERATIONID_ATTR);
+ attr->SetValue(operationId);
+ Stop(service, request);
+ }
+ Cerr << "Operations: " << (maxOperations - operations.size()) << "/" << maxOperations << Endl;
+ Sleep(TDuration::Seconds(10));
+ }
+
+ JobStopRequest request;
+ request.SetRevision(revision);
+ request.SetNegativeRevision(negative);
+
+ auto promise = NThreading::NewPromise<void>();
+ auto callback = [&](TGrpcStatus&& status, JobStopResponse&& ) {
+ if (status.Ok()) {
+ promise.SetValue();
+ } else {
+ Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl;
+ promise.SetValue();
+ }
+ };
+
+ service.DoRequest<JobStopRequest, JobStopResponse>(
+ request,
+ callback,
+ &DqService::Stub::AsyncJobStop);
+ promise.GetFuture().GetValueSync();
+ return;
+}
+
+void OpenSession(TServiceConnection<DqService>& service, const TString& sessionId, const TString& username) {
+ Yql::DqsProto::OpenSessionRequest request;
+ request.SetSession(sessionId);
+ request.SetUsername(username);
+
+ auto promise = NThreading::NewPromise<void>();
+ auto callback = [promise](NGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable {
+ Y_UNUSED(resp);
+ if (status.Ok()) {
+ promise.SetValue();
+ } else {
+ promise.SetException(status.Msg);
+ }
+ };
+
+ service.DoRequest<Yql::DqsProto::OpenSessionRequest, Yql::DqsProto::OpenSessionResponse>(
+ request, callback, &Yql::DqsProto::DqService::Stub::AsyncOpenSession);
+ promise.GetFuture().GetValueSync();
+}
+
+void CloseSession(TServiceConnection<DqService>& service, const TString& sessionId) {
+ Yql::DqsProto::CloseSessionRequest request;
+ request.SetSession(sessionId);
+
+ auto callback = [](NGrpc::TGrpcStatus&& status, Yql::DqsProto::CloseSessionResponse&& resp) {
+ Y_UNUSED(resp);
+ Y_UNUSED(status);
+ };
+
+ service.DoRequest<Yql::DqsProto::CloseSessionRequest, Yql::DqsProto::CloseSessionResponse>(
+ request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession);
+}
+
+Yql::DqsProto::RoutesResponse Routes(TServiceConnection<DqService>& service, const TString& nodeIdStr) {
+ Yql::DqsProto::RoutesRequest request;
+ int nodeId = 0;
+ TryFromString(nodeIdStr, nodeId);
+ request.SetNodeId(nodeId);
+
+ auto promise = NThreading::NewPromise<Yql::DqsProto::RoutesResponse>();
+ auto callback = [promise](NGrpc::TGrpcStatus&& status, Yql::DqsProto::RoutesResponse&& resp) mutable {
+ Y_UNUSED(resp);
+ if (status.Ok()) {
+ promise.SetValue(resp);
+ } else {
+ promise.SetException(status.Msg);
+ }
+ };
+
+ service.DoRequest<Yql::DqsProto::RoutesRequest, Yql::DqsProto::RoutesResponse>(
+ request, callback, &Yql::DqsProto::DqService::Stub::AsyncRoutes);
+ return promise.GetFuture().GetValueSync();
+}
+
+Yql::DqsProto::BenchmarkResponse Bench(TServiceConnection<DqService>& service, const TVector<TString>& args) {
+ Yql::DqsProto::BenchmarkRequest request;
+ request.SetWorkerCount(10);
+ request.SetInflight(10);
+ request.SetTotalRequests(10000);
+
+ if (args.size() > 1) {
+ TStringInput inputStream1(args[1]);
+ ParseFromTextFormat(inputStream1, request, EParseFromTextFormatOption::AllowUnknownField);
+ }
+
+ auto promise = NThreading::NewPromise<Yql::DqsProto::BenchmarkResponse>();
+
+ auto callback = [promise](NGrpc::TGrpcStatus&& status, Yql::DqsProto::BenchmarkResponse&& resp) mutable
+ {
+ Y_UNUSED(resp);
+ if (status.Ok()) {
+ promise.SetValue(resp);
+ } else {
+ promise.SetException(status.Msg);
+ }
+ };
+
+ service.DoRequest<Yql::DqsProto::BenchmarkRequest, Yql::DqsProto::BenchmarkResponse>(
+ request, callback, &Yql::DqsProto::DqService::Stub::AsyncBenchmark);
+ return promise.GetFuture().GetValueSync();
+}
+
+int main(int argc, char** argv) {
+ using namespace NLastGetopt;
+ TOpts opts = TOpts::Default();
+ opts.AddHelpOption();
+ opts.AddLongOption('p', "port", "Grpc port");
+ opts.AddLongOption('h', "host", "Grpc host");
+ opts.SetFreeArgTitle(0, "command", R"(query or svnrevision)");
+ opts.SetFreeArgTitle(1, "<query>", R"(SQL code goes here)");
+ opts.SetFreeArgTitle(2, "endpoint", "endpoint of the remote database if the query is remote");
+ opts.SetFreeArgTitle(3, "database", "path to the remote database on the endpoint");
+ opts.SetFreeArgsMin(1);
+
+ TOptsParseResult res(&opts, argc, argv);
+
+ ui16 port = [&]() {
+ if (res.Has("port")) {
+ return res.Get<ui16>("port");
+ } else {
+ ::TFile file("/var/tmp/dq_grpc_port", OpenExisting | RdOnly);
+ TString buffer;
+ buffer.resize(file.GetLength());
+ file.Load(&buffer[0], buffer.size());
+ return FromString<ui16>(buffer);
+ }
+ } ();
+ TString host = [&]() {
+ if (res.Has("host")) {
+ return res.Get<TString>("host");
+ } else {
+ return TString("localhost");
+ }
+ } ();
+ auto args = res.GetFreeArgs();
+ const auto& command = args[0];
+
+ TGRpcClientConfig grpcConf(TStringBuilder() << host << ":" << port);
+ TGRpcClientLow grpcClient(2);
+ auto conn = grpcClient.CreateGRpcServiceConnection<DqService>(grpcConf);
+
+ if (command == "svnrevision") {
+ return SvnRevision(*conn, args);
+ }
+
+ if (command == "info") {
+ auto status = Info(*conn);
+ TString responseStr;
+ {
+ TStringOutput output1(responseStr);
+ SerializeToTextFormat(status, output1);
+ }
+
+ Cout << responseStr << Endl;
+
+ return 0;
+ }
+
+ if (command == "operation_stop") {
+ OperationStop(*conn, args);
+ return 0;
+ }
+
+ if (command == "stop") {
+ Stop(*conn, args, false);
+ return 0;
+ }
+
+ if (command == "force_stop") {
+ Stop(*conn, args, true);
+ return 0;
+ }
+
+ if (command == "smart_stop") {
+ SmartStop(*conn, args, false);
+ return 0;
+ }
+
+ if (command == "upgrade") {
+ SmartStop(*conn, args, true);
+ return 0;
+ }
+
+ if (command == "percluster_upgrade") {
+ PerClusterUpgrade(*conn, args);
+ return 0;
+ }
+
+ if (command == "open_session") {
+ OpenSession(*conn, args[1], args[2]);
+ return 0;
+ }
+
+ if (command == "close_session") {
+ CloseSession(*conn, args[1]);
+ return 0;
+ }
+
+ if (command == "routes") {
+ auto status = Routes(*conn, args[1]);
+ TString responseStr;
+ {
+ TStringOutput output1(responseStr);
+ SerializeToTextFormat(status, output1);
+ }
+
+ Cout << responseStr << Endl;
+ return 0;
+ }
+
+ if (command == "bench") {
+ auto status = Bench(*conn, args);
+ TString responseStr;
+ {
+ TStringOutput output1(responseStr);
+ SerializeToTextFormat(status, output1);
+ }
+
+ Cout << responseStr << Endl;
+ return 0;
+ }
+
+ Cerr << "Unexpected command. Try --help." << Endl;
+ return 1;
+}
diff --git a/ydb/library/yql/tools/dq/dq_cli/ya.make b/ydb/library/yql/tools/dq/dq_cli/ya.make
new file mode 100644
index 0000000000..e700b9b26b
--- /dev/null
+++ b/ydb/library/yql/tools/dq/dq_cli/ya.make
@@ -0,0 +1,19 @@
+PROGRAM()
+
+PEERDIR(
+ library/cpp/getopt
+ library/cpp/grpc/client
+ library/cpp/protobuf/util
+ library/cpp/threading/future
+ ydb/library/yql/utils
+ ydb/public/api/protos
+ ydb/public/lib/yson_value
+ ydb/library/yql/providers/dq/api/grpc
+ ydb/library/yql/providers/dq/common
+)
+
+SRCS(
+ main.cpp
+)
+
+END()
diff --git a/ydb/library/yql/tools/dq/ya.make b/ydb/library/yql/tools/dq/ya.make
index 68eba94d95..1043bcb327 100644
--- a/ydb/library/yql/tools/dq/ya.make
+++ b/ydb/library/yql/tools/dq/ya.make
@@ -1,4 +1,5 @@
RECURSE(
+ dq_cli
service_node
worker_node
)