diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-10-25 00:20:50 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-10-25 00:38:23 +0300 |
commit | 67ad455d45c5960b956357712a30e3b03d443342 (patch) | |
tree | 513ba5eab81bd77e257337ef7461763408aebeb7 | |
parent | 69b890018688b9b26e048d80009e94af5f67803d (diff) | |
download | ydb-67ad455d45c5960b956357712a30e3b03d443342.tar.gz |
Introduce OSS version of control utility for service_node/working_node (dq testing utilities)
18 files changed, 704 insertions, 64 deletions
diff --git a/.mapping.json b/.mapping.json index 94d161bbc7..31e35d3301 100644 --- a/.mapping.json +++ b/.mapping.json @@ -8366,11 +8366,7 @@ "ydb/library/yql/sql/v1/ut/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/sql/v1/ut/CMakeLists.txt":"", "ydb/library/yql/sql/v1/ut/CMakeLists.windows-x86_64.txt":"", - "ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt":"", - "ydb/library/yql/tools/CMakeLists.linux-aarch64.txt":"", - "ydb/library/yql/tools/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/tools/CMakeLists.txt":"", - "ydb/library/yql/tools/CMakeLists.windows-x86_64.txt":"", "ydb/library/yql/tools/astdiff/CMakeLists.darwin-x86_64.txt":"", "ydb/library/yql/tools/astdiff/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/tools/astdiff/CMakeLists.linux-x86_64.txt":"", @@ -8380,6 +8376,12 @@ "ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/tools/dq/CMakeLists.txt":"", + "ydb/library/yql/tools/dq/CMakeLists.windows-x86_64.txt":"", + "ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt":"", + "ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt":"", "ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt":"", "ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt deleted file mode 100644 index 61ab79236b..0000000000 --- a/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt +++ /dev/null @@ -1,15 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -add_subdirectory(astdiff) -add_subdirectory(dq) -add_subdirectory(dqrun) -add_subdirectory(mrjob) -add_subdirectory(sql2yql) -add_subdirectory(sql_formatter) -add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt deleted file mode 100644 index 61ab79236b..0000000000 --- a/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt +++ /dev/null @@ -1,15 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -add_subdirectory(astdiff) -add_subdirectory(dq) -add_subdirectory(dqrun) -add_subdirectory(mrjob) -add_subdirectory(sql2yql) -add_subdirectory(sql_formatter) -add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt deleted file mode 100644 index 61ab79236b..0000000000 --- a/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt +++ /dev/null @@ -1,15 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -add_subdirectory(astdiff) -add_subdirectory(dq) -add_subdirectory(dqrun) -add_subdirectory(mrjob) -add_subdirectory(sql2yql) -add_subdirectory(sql_formatter) -add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/CMakeLists.txt b/ydb/library/yql/tools/CMakeLists.txt index f8b31df0c1..61ab79236b 100644 --- a/ydb/library/yql/tools/CMakeLists.txt +++ b/ydb/library/yql/tools/CMakeLists.txt @@ -6,12 +6,10 @@ # original buildsystem will not be accepted. -if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-aarch64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") - include(CMakeLists.darwin-x86_64.txt) -elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) - include(CMakeLists.windows-x86_64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-x86_64.txt) -endif() +add_subdirectory(astdiff) +add_subdirectory(dq) +add_subdirectory(dqrun) +add_subdirectory(mrjob) +add_subdirectory(sql2yql) +add_subdirectory(sql_formatter) +add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt index bb8870384d..d21d3f9f9e 100644 --- a/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt @@ -6,5 +6,6 @@ # original buildsystem will not be accepted. +add_subdirectory(dq_cli) add_subdirectory(service_node) add_subdirectory(worker_node) diff --git a/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt index bb8870384d..d21d3f9f9e 100644 --- a/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt @@ -6,5 +6,6 @@ # original buildsystem will not be accepted. +add_subdirectory(dq_cli) add_subdirectory(service_node) add_subdirectory(worker_node) diff --git a/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt index bb8870384d..d21d3f9f9e 100644 --- a/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt @@ -6,5 +6,6 @@ # original buildsystem will not be accepted. +add_subdirectory(dq_cli) add_subdirectory(service_node) add_subdirectory(worker_node) diff --git a/ydb/library/yql/tools/dq/CMakeLists.txt b/ydb/library/yql/tools/dq/CMakeLists.txt index 606ff46b4b..f8b31df0c1 100644 --- a/ydb/library/yql/tools/dq/CMakeLists.txt +++ b/ydb/library/yql/tools/dq/CMakeLists.txt @@ -10,6 +10,8 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarc include(CMakeLists.linux-aarch64.txt) elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) include(CMakeLists.linux-x86_64.txt) endif() diff --git a/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.windows-x86_64.txt index bd1f049b04..af9aac9cb0 100644 --- a/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/tools/dq/CMakeLists.windows-x86_64.txt @@ -6,9 +6,4 @@ # original buildsystem will not be accepted. -add_subdirectory(astdiff) -add_subdirectory(dqrun) -add_subdirectory(mrjob) -add_subdirectory(sql2yql) -add_subdirectory(sql_formatter) -add_subdirectory(yqlrun) +add_subdirectory(dq_cli) diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..aaa1da1d54 --- /dev/null +++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,38 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(dq_cli) +target_link_libraries(dq_cli PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + library-cpp-getopt + cpp-grpc-client + cpp-protobuf-util + cpp-threading-future + library-yql-utils + api-protos + public-lib-yson_value + dq-api-grpc + providers-dq-common +) +target_link_options(dq_cli PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(dq_cli PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/dq_cli/main.cpp +) +target_allocator(dq_cli + system_allocator +) +vcs_info(dq_cli) diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..0caf90835f --- /dev/null +++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-aarch64.txt @@ -0,0 +1,41 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(dq_cli) +target_link_libraries(dq_cli PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-getopt + cpp-grpc-client + cpp-protobuf-util + cpp-threading-future + library-yql-utils + api-protos + public-lib-yson_value + dq-api-grpc + providers-dq-common +) +target_link_options(dq_cli PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(dq_cli PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/dq_cli/main.cpp +) +target_allocator(dq_cli + cpp-malloc-jemalloc +) +vcs_info(dq_cli) diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..140fa5efe3 --- /dev/null +++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.linux-x86_64.txt @@ -0,0 +1,43 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(dq_cli) +target_link_libraries(dq_cli PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + library-cpp-getopt + cpp-grpc-client + cpp-protobuf-util + cpp-threading-future + library-yql-utils + api-protos + public-lib-yson_value + dq-api-grpc + providers-dq-common +) +target_link_options(dq_cli PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(dq_cli PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/dq_cli/main.cpp +) +target_allocator(dq_cli + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(dq_cli) diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..18f6217842 --- /dev/null +++ b/ydb/library/yql/tools/dq/dq_cli/CMakeLists.windows-x86_64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(dq_cli) +target_link_libraries(dq_cli PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + library-cpp-getopt + cpp-grpc-client + cpp-protobuf-util + cpp-threading-future + library-yql-utils + api-protos + public-lib-yson_value + dq-api-grpc + providers-dq-common +) +target_sources(dq_cli PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/dq_cli/main.cpp +) +target_allocator(dq_cli + system_allocator +) +vcs_info(dq_cli) diff --git a/ydb/library/yql/tools/dq/dq_cli/main.cpp b/ydb/library/yql/tools/dq/dq_cli/main.cpp new file mode 100644 index 0000000000..e7780fbe8b --- /dev/null +++ b/ydb/library/yql/tools/dq/dq_cli/main.cpp @@ -0,0 +1,495 @@ +#include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h> +#include <ydb/library/yql/providers/dq/actors/yt/nodeid_assigner.h> + +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <library/cpp/getopt/last_getopt.h> +#include <library/cpp/grpc/client/grpc_client_low.h> +#include <library/cpp/threading/future/future.h> +#include <library/cpp/protobuf/util/pb_io.h> + +#include <util/string/builder.h> +#include <util/system/env.h> +#include <util/system/file.h> +#include <util/stream/file.h> +#include <util/generic/guid.h> +#include <util/string/split.h> + +using namespace NGrpc; +using namespace Yql::DqsProto; +using namespace NYql; + +int SvnRevision(TServiceConnection<DqService>& service, const TVector<TString>& args) { + if (args.size() != 1) { + Cerr << "Suspicious extra args" << Endl; + } + + auto promise = NThreading::NewPromise<int>(); + auto callback = [&](TGrpcStatus&& status, SvnRevisionResponse&& resp) { + if (status.Ok()) { + Cout << resp.GetRevision() << Endl; + promise.SetValue(0); + } else { + Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl; + promise.SetValue(status.GRpcStatusCode); + } + }; + + service.DoRequest<SvnRevisionRequest, SvnRevisionResponse>(SvnRevisionRequest(), callback, + &DqService::Stub::AsyncSvnRevision); + return promise.GetFuture().GetValueSync(); +} + +ClusterStatusResponse Info(TServiceConnection<DqService>& service) { + auto promise = NThreading::NewPromise<ClusterStatusResponse>(); + auto callback = [&](TGrpcStatus&& status, ClusterStatusResponse&& resp) { + if (status.Ok()) { + promise.SetValue(resp); + } else { + Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl; + promise.SetException("Error"); + } + }; + + service.DoRequest<ClusterStatusRequest, ClusterStatusResponse>( + ClusterStatusRequest(), + callback, + &DqService::Stub::AsyncClusterStatus); + return promise.GetFuture().GetValueSync(); +} + +void Stop(TServiceConnection<DqService>& service, const JobStopRequest& request) +{ + auto promise = NThreading::NewPromise<void>(); + auto callback = [&](TGrpcStatus&& status, JobStopResponse&& ) { + if (status.Ok()) { + promise.SetValue(); + } else { + Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl; + promise.SetValue(); + } + }; + + service.DoRequest<JobStopRequest, JobStopResponse>( + request, + callback, + &DqService::Stub::AsyncJobStop); + promise.GetFuture().GetValueSync(); + return; +} + +void ClusterUpgrade( + TServiceConnection<DqService>& service, + const TString& revision, + const TString& cluster, + bool force) +{ + Y_UNUSED(force); + + auto status = Info(service); + int totalWorkers = 0; + int started = 0; + for (const auto& worker : status.GetWorker()) { + if (worker.GetClusterName() == cluster) { + if (worker.GetRevision() == revision) { + started ++; + } else { + totalWorkers ++; + } + } + } + + Cerr << "Total workers of new revision " << revision << " : " << started << Endl; + Cerr << "Stopping " << totalWorkers << " workers" << Endl; + + while (true) { + int started = 0; + int stopping = 0; + int pending = 0; + int ready = 0; + for (const auto& worker : status.GetWorker()) { + if (worker.GetDead()) { + continue; + } + if (worker.GetClusterName() == cluster) + { + if (worker.GetRevision() == revision) { + started ++ ; + if (worker.GetDownloadList().size() == 0) { + ready ++; + } + } else { + pending ++; + } + if (worker.GetStopping()) { + stopping ++; + } + } + } + + Cerr << "Stopping/Pending/Started/Ready/Total " + << stopping << "/" + << pending << "/" + << started << "/" + << ready << "/" + << totalWorkers << Endl; + + if (pending == 0 && stopping== 0 && ready >= 0.8 * totalWorkers) { + break; + } + + JobStopRequest request; + request.SetRevision(revision); + request.SetNegativeRevision(true); + request.SetForce(force); + request.SetClusterName(cluster); + + Stop(service, request); + + Sleep(TDuration::Seconds(10)); + + status = Info(service); + } + Cerr << "Done" << Endl; +} + +void PerClusterUpgrade(TServiceConnection<DqService>& service, const TVector<TString>& args) { + TString revision; + bool force = false; + + if (args.size() < 3) { + Cerr << "Usage: percluster_upgrade revision hahn,arnold [force]" << Endl; + return; + } + + revision = args[1]; + + if (args.size() > 3) { + force = args[3] == "force"; + } + + for (const auto& it : StringSplitter(args[2]).Split(',')) { + Cerr << "Stop " << it.Token() << Endl; + + ClusterUpgrade(service, revision, TString(it.Token()), force); + } +} + +void Stop(TServiceConnection<DqService>& service, const TVector<TString>& args, bool force) { + if (args.size() != 2) { + Cerr << "Suspicious extra args" << Endl; + } + + JobStopRequest request; + TStringInput inputStream1(args[1]); + ParseFromTextFormat(inputStream1, request, EParseFromTextFormatOption::AllowUnknownField); + request.SetForce(force); + Stop(service, request); +} + +void OperationStop(TServiceConnection<DqService>& service, const TVector<TString>& args) +{ + if (args.size() != 2) { + Cerr << "Suspicious extra args" << Endl; + } + TString operationId = args[1]; + + auto promise = NThreading::NewPromise<void>(); + auto callback = [&](TGrpcStatus&& status, OperationStopResponse&& ) { + if (status.Ok()) { + promise.SetValue(); + } else { + Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl; + promise.SetValue(); + } + }; + + OperationStopRequest request; + request.SetOperationId(operationId); + + service.DoRequest<OperationStopRequest, OperationStopResponse>( + request, + callback, + &DqService::Stub::AsyncOperationStop); + promise.GetFuture().GetValueSync(); + return; +} + +void SmartStop(TServiceConnection<DqService>& service, const TVector<TString>& args, bool negative = false) { + if (args.size() != 2) { + Cerr << "Suspicious extra args" << Endl; return; + } + if (args.size() < 2) { + Cerr << "Revision requied" << Endl; return; + } + + TString revision = args[1]; + auto status = Info(service); + auto totalWorkers = status.GetWorker().size(); + int maxOperations = -1; + + while (true) { + THashSet<TString> operations; + status = Info(service); + for (const auto& worker : status.GetWorker()) { + if (negative == (worker.GetRevision() == revision)) { + continue; + } + if (worker.GetDead()) { + continue; + } + // TString OPERATIONID_ATTR = "yql_operation_id"; // TODO + for (const auto& attr : worker.GetAttribute()) { + if (attr.GetKey() == NYql::NCommonAttrs::OPERATIONID_ATTR) { + operations.emplace(attr.GetValue()); + } + } + } + if (operations.empty()) { + Cerr << "Done" << Endl; + break; + } + maxOperations = Max<int>(maxOperations, operations.size()); + if (totalWorkers * 0.9 <= status.GetWorker().size()) { + auto operationId = *operations.begin(); + Cerr << "Stopping operation " << operationId << Endl; + JobStopRequest request; + request.SetRevision(revision); + request.SetNegativeRevision(negative); + auto* attr = request.AddAttribute(); + attr->SetKey(NYql::NCommonAttrs::OPERATIONID_ATTR); + attr->SetValue(operationId); + Stop(service, request); + } + Cerr << "Operations: " << (maxOperations - operations.size()) << "/" << maxOperations << Endl; + Sleep(TDuration::Seconds(10)); + } + + JobStopRequest request; + request.SetRevision(revision); + request.SetNegativeRevision(negative); + + auto promise = NThreading::NewPromise<void>(); + auto callback = [&](TGrpcStatus&& status, JobStopResponse&& ) { + if (status.Ok()) { + promise.SetValue(); + } else { + Cerr << "Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl; + promise.SetValue(); + } + }; + + service.DoRequest<JobStopRequest, JobStopResponse>( + request, + callback, + &DqService::Stub::AsyncJobStop); + promise.GetFuture().GetValueSync(); + return; +} + +void OpenSession(TServiceConnection<DqService>& service, const TString& sessionId, const TString& username) { + Yql::DqsProto::OpenSessionRequest request; + request.SetSession(sessionId); + request.SetUsername(username); + + auto promise = NThreading::NewPromise<void>(); + auto callback = [promise](NGrpc::TGrpcStatus&& status, Yql::DqsProto::OpenSessionResponse&& resp) mutable { + Y_UNUSED(resp); + if (status.Ok()) { + promise.SetValue(); + } else { + promise.SetException(status.Msg); + } + }; + + service.DoRequest<Yql::DqsProto::OpenSessionRequest, Yql::DqsProto::OpenSessionResponse>( + request, callback, &Yql::DqsProto::DqService::Stub::AsyncOpenSession); + promise.GetFuture().GetValueSync(); +} + +void CloseSession(TServiceConnection<DqService>& service, const TString& sessionId) { + Yql::DqsProto::CloseSessionRequest request; + request.SetSession(sessionId); + + auto callback = [](NGrpc::TGrpcStatus&& status, Yql::DqsProto::CloseSessionResponse&& resp) { + Y_UNUSED(resp); + Y_UNUSED(status); + }; + + service.DoRequest<Yql::DqsProto::CloseSessionRequest, Yql::DqsProto::CloseSessionResponse>( + request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession); +} + +Yql::DqsProto::RoutesResponse Routes(TServiceConnection<DqService>& service, const TString& nodeIdStr) { + Yql::DqsProto::RoutesRequest request; + int nodeId = 0; + TryFromString(nodeIdStr, nodeId); + request.SetNodeId(nodeId); + + auto promise = NThreading::NewPromise<Yql::DqsProto::RoutesResponse>(); + auto callback = [promise](NGrpc::TGrpcStatus&& status, Yql::DqsProto::RoutesResponse&& resp) mutable { + Y_UNUSED(resp); + if (status.Ok()) { + promise.SetValue(resp); + } else { + promise.SetException(status.Msg); + } + }; + + service.DoRequest<Yql::DqsProto::RoutesRequest, Yql::DqsProto::RoutesResponse>( + request, callback, &Yql::DqsProto::DqService::Stub::AsyncRoutes); + return promise.GetFuture().GetValueSync(); +} + +Yql::DqsProto::BenchmarkResponse Bench(TServiceConnection<DqService>& service, const TVector<TString>& args) { + Yql::DqsProto::BenchmarkRequest request; + request.SetWorkerCount(10); + request.SetInflight(10); + request.SetTotalRequests(10000); + + if (args.size() > 1) { + TStringInput inputStream1(args[1]); + ParseFromTextFormat(inputStream1, request, EParseFromTextFormatOption::AllowUnknownField); + } + + auto promise = NThreading::NewPromise<Yql::DqsProto::BenchmarkResponse>(); + + auto callback = [promise](NGrpc::TGrpcStatus&& status, Yql::DqsProto::BenchmarkResponse&& resp) mutable + { + Y_UNUSED(resp); + if (status.Ok()) { + promise.SetValue(resp); + } else { + promise.SetException(status.Msg); + } + }; + + service.DoRequest<Yql::DqsProto::BenchmarkRequest, Yql::DqsProto::BenchmarkResponse>( + request, callback, &Yql::DqsProto::DqService::Stub::AsyncBenchmark); + return promise.GetFuture().GetValueSync(); +} + +int main(int argc, char** argv) { + using namespace NLastGetopt; + TOpts opts = TOpts::Default(); + opts.AddHelpOption(); + opts.AddLongOption('p', "port", "Grpc port"); + opts.AddLongOption('h', "host", "Grpc host"); + opts.SetFreeArgTitle(0, "command", R"(query or svnrevision)"); + opts.SetFreeArgTitle(1, "<query>", R"(SQL code goes here)"); + opts.SetFreeArgTitle(2, "endpoint", "endpoint of the remote database if the query is remote"); + opts.SetFreeArgTitle(3, "database", "path to the remote database on the endpoint"); + opts.SetFreeArgsMin(1); + + TOptsParseResult res(&opts, argc, argv); + + ui16 port = [&]() { + if (res.Has("port")) { + return res.Get<ui16>("port"); + } else { + ::TFile file("/var/tmp/dq_grpc_port", OpenExisting | RdOnly); + TString buffer; + buffer.resize(file.GetLength()); + file.Load(&buffer[0], buffer.size()); + return FromString<ui16>(buffer); + } + } (); + TString host = [&]() { + if (res.Has("host")) { + return res.Get<TString>("host"); + } else { + return TString("localhost"); + } + } (); + auto args = res.GetFreeArgs(); + const auto& command = args[0]; + + TGRpcClientConfig grpcConf(TStringBuilder() << host << ":" << port); + TGRpcClientLow grpcClient(2); + auto conn = grpcClient.CreateGRpcServiceConnection<DqService>(grpcConf); + + if (command == "svnrevision") { + return SvnRevision(*conn, args); + } + + if (command == "info") { + auto status = Info(*conn); + TString responseStr; + { + TStringOutput output1(responseStr); + SerializeToTextFormat(status, output1); + } + + Cout << responseStr << Endl; + + return 0; + } + + if (command == "operation_stop") { + OperationStop(*conn, args); + return 0; + } + + if (command == "stop") { + Stop(*conn, args, false); + return 0; + } + + if (command == "force_stop") { + Stop(*conn, args, true); + return 0; + } + + if (command == "smart_stop") { + SmartStop(*conn, args, false); + return 0; + } + + if (command == "upgrade") { + SmartStop(*conn, args, true); + return 0; + } + + if (command == "percluster_upgrade") { + PerClusterUpgrade(*conn, args); + return 0; + } + + if (command == "open_session") { + OpenSession(*conn, args[1], args[2]); + return 0; + } + + if (command == "close_session") { + CloseSession(*conn, args[1]); + return 0; + } + + if (command == "routes") { + auto status = Routes(*conn, args[1]); + TString responseStr; + { + TStringOutput output1(responseStr); + SerializeToTextFormat(status, output1); + } + + Cout << responseStr << Endl; + return 0; + } + + if (command == "bench") { + auto status = Bench(*conn, args); + TString responseStr; + { + TStringOutput output1(responseStr); + SerializeToTextFormat(status, output1); + } + + Cout << responseStr << Endl; + return 0; + } + + Cerr << "Unexpected command. Try --help." << Endl; + return 1; +} diff --git a/ydb/library/yql/tools/dq/dq_cli/ya.make b/ydb/library/yql/tools/dq/dq_cli/ya.make new file mode 100644 index 0000000000..e700b9b26b --- /dev/null +++ b/ydb/library/yql/tools/dq/dq_cli/ya.make @@ -0,0 +1,19 @@ +PROGRAM() + +PEERDIR( + library/cpp/getopt + library/cpp/grpc/client + library/cpp/protobuf/util + library/cpp/threading/future + ydb/library/yql/utils + ydb/public/api/protos + ydb/public/lib/yson_value + ydb/library/yql/providers/dq/api/grpc + ydb/library/yql/providers/dq/common +) + +SRCS( + main.cpp +) + +END() diff --git a/ydb/library/yql/tools/dq/ya.make b/ydb/library/yql/tools/dq/ya.make index 68eba94d95..1043bcb327 100644 --- a/ydb/library/yql/tools/dq/ya.make +++ b/ydb/library/yql/tools/dq/ya.make @@ -1,4 +1,5 @@ RECURSE( + dq_cli service_node worker_node ) |