diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-04-05 17:22:35 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-04-05 17:22:35 +0300 |
commit | 54ad0af8cfe99687f95779e2d97d3e6ad6b350d4 (patch) | |
tree | 15803c7c3914e85fc5033a5426abb55d50bf4516 | |
parent | ee97871cc4cea65efe6f12af3599773ed6a7833a (diff) | |
download | ydb-54ad0af8cfe99687f95779e2d97d3e6ad6b350d4.tar.gz |
workload topic
40 files changed, 1580 insertions, 33 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt index 0a0a67946b..05899f2c44 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(topic_workload) get_built_tool_path( TOOL_rescompiler_bin TOOL_rescompiler_dependency @@ -25,6 +26,7 @@ target_link_libraries(clicommands PUBLIC ydb-library-workload public-lib-operation_id common + topic_workload lib-ydb_cli-dump lib-ydb_cli-import topic @@ -79,6 +81,7 @@ target_link_libraries(clicommands.global PUBLIC ydb-library-workload public-lib-operation_id common + topic_workload lib-ydb_cli-dump lib-ydb_cli-import topic diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt index 9ea855179b..a3cb431493 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(topic_workload) get_built_tool_path( TOOL_rescompiler_bin TOOL_rescompiler_dependency @@ -26,6 +27,7 @@ target_link_libraries(clicommands PUBLIC ydb-library-workload public-lib-operation_id common + topic_workload lib-ydb_cli-dump lib-ydb_cli-import topic @@ -81,6 +83,7 @@ target_link_libraries(clicommands.global PUBLIC ydb-library-workload public-lib-operation_id common + topic_workload lib-ydb_cli-dump lib-ydb_cli-import topic diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt index 9ea855179b..a3cb431493 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(topic_workload) get_built_tool_path( TOOL_rescompiler_bin TOOL_rescompiler_dependency @@ -26,6 +27,7 @@ target_link_libraries(clicommands PUBLIC ydb-library-workload public-lib-operation_id common + topic_workload lib-ydb_cli-dump lib-ydb_cli-import topic @@ -81,6 +83,7 @@ target_link_libraries(clicommands.global PUBLIC ydb-library-workload public-lib-operation_id common + topic_workload lib-ydb_cli-dump lib-ydb_cli-import topic diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt index 0a0a67946b..05899f2c44 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(topic_workload) get_built_tool_path( TOOL_rescompiler_bin TOOL_rescompiler_dependency @@ -25,6 +26,7 @@ target_link_libraries(clicommands PUBLIC ydb-library-workload public-lib-operation_id common + topic_workload lib-ydb_cli-dump lib-ydb_cli-import topic @@ -79,6 +81,7 @@ target_link_libraries(clicommands.global PUBLIC ydb-library-workload public-lib-operation_id common + topic_workload lib-ydb_cli-dump lib-ydb_cli-import topic diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..ce4312b20f --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,41 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(topic_workload) +target_link_libraries(topic_workload PUBLIC + contrib-libs-cxxsupp + yutil + yql-public-issue + public-issue-protos + api-grpc + api-protos + api-protos-annotations + public-lib-operation_id + lib-operation_id-protos + cpp-client-draft + cpp-client-ydb_driver + cpp-client-ydb_proto + cpp-client-ydb_table + cpp-client-ydb_topic + client-ydb_types-operation + client-ydb_types-status +) +target_sources(topic_workload PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp +) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..1499a10225 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt @@ -0,0 +1,42 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(topic_workload) +target_link_libraries(topic_workload PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + yql-public-issue + public-issue-protos + api-grpc + api-protos + api-protos-annotations + public-lib-operation_id + lib-operation_id-protos + cpp-client-draft + cpp-client-ydb_driver + cpp-client-ydb_proto + cpp-client-ydb_table + cpp-client-ydb_topic + client-ydb_types-operation + client-ydb_types-status +) +target_sources(topic_workload PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp +) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..1499a10225 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt @@ -0,0 +1,42 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(topic_workload) +target_link_libraries(topic_workload PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + yql-public-issue + public-issue-protos + api-grpc + api-protos + api-protos-annotations + public-lib-operation_id + lib-operation_id-protos + cpp-client-draft + cpp-client-ydb_driver + cpp-client-ydb_proto + cpp-client-ydb_table + cpp-client-ydb_topic + client-ydb_types-operation + client-ydb_types-status +) +target_sources(topic_workload PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp +) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.txt new file mode 100644 index 0000000000..a692f90f36 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64") + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..ce4312b20f --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt @@ -0,0 +1,41 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(topic_workload) +target_link_libraries(topic_workload PUBLIC + contrib-libs-cxxsupp + yutil + yql-public-issue + public-issue-protos + api-grpc + api-protos + api-protos-annotations + public-lib-operation_id + lib-operation_id-protos + cpp-client-draft + cpp-client-ydb_driver + cpp-client-ydb_proto + cpp-client-ydb_table + cpp-client-ydb_topic + client-ydb_types-operation + client-ydb_types-status +) +target_sources(topic_workload PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp +) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp new file mode 100644 index 0000000000..765da9f9c3 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp @@ -0,0 +1,26 @@ +#include "topic_workload.h" + +#include "topic_workload_defines.h" +#include "topic_workload_clean.h" +#include "topic_workload_init.h" +#include "topic_workload_run_read.h" +#include "topic_workload_run_write.h" +#include "topic_workload_run_full.h" + +using namespace NYdb::NConsoleClient; + +TCommandWorkloadTopic::TCommandWorkloadTopic() + : TClientCommandTree("topic", {}, "YDB topic workload") +{ + AddCommand(std::make_unique<TCommandWorkloadTopicInit>()); + AddCommand(std::make_unique<TCommandWorkloadTopicClean>()); + AddCommand(std::make_unique<TCommandWorkloadTopicRun>()); +} + +TCommandWorkloadTopicRun::TCommandWorkloadTopicRun() + : TClientCommandTree("run", {}, "Run YDB topic workload") +{ + AddCommand(std::make_unique<TCommandWorkloadTopicRunRead>()); + AddCommand(std::make_unique<TCommandWorkloadTopicRunWrite>()); + AddCommand(std::make_unique<TCommandWorkloadTopicRunFull>()); +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.h new file mode 100644 index 0000000000..ff966aff42 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.h @@ -0,0 +1,13 @@ +#pragma once + +#include <ydb/public/lib/ydb_cli/common/command.h> + +namespace NYdb { + namespace NConsoleClient { + + class TCommandWorkloadTopic: public TClientCommandTree { + public: + TCommandWorkloadTopic(); + }; + } +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp new file mode 100644 index 0000000000..29d85b6726 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp @@ -0,0 +1,39 @@ +#include "topic_workload_clean.h" + +#include "topic_workload_defines.h" + +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> + + +using namespace NYdb::NConsoleClient; + +TCommandWorkloadTopicClean::TCommandWorkloadTopicClean() + : TWorkloadCommand("clean", {}, "drop topic created in init phase") +{ +} + +void TCommandWorkloadTopicClean::Config(TConfig& config) { + TYdbCommand::Config(config); + config.SetFreeArgsNum(0); +} + +void TCommandWorkloadTopicClean::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandWorkloadTopicClean::Run(TConfig& config) { + Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config)); + auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*Driver); + + auto topicName = config.Database + "/" + TOPIC; + auto describeTopicResult = topicClient->DescribeTopic(topicName, {}).GetValueSync(); + if (describeTopicResult.GetTopicDescription().GetTotalPartitionsCount() == 0) { + Cout << "Topic " << topicName << " does not exists.\n"; + return EXIT_FAILURE; + } + + auto result = topicClient->DropTopic(config.Database + "/" + TOPIC).GetValueSync(); + ThrowOnError(result); + return EXIT_SUCCESS; +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h new file mode 100644 index 0000000000..f1ba7a3dcc --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h @@ -0,0 +1,22 @@ +#pragma once + +#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> + +namespace NYdb { + namespace NConsoleClient { + class TCommandWorkloadTopicClean: public TWorkloadCommand { + public: + TCommandWorkloadTopicClean(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + }; + + class TCommandWorkloadTopicRun: public TClientCommandTree { + public: + TCommandWorkloadTopicRun(); + + private: + }; + } +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h new file mode 100644 index 0000000000..3b9c05a1f5 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h @@ -0,0 +1,14 @@ +#pragma once + +#include <util/generic/string.h> + +#define WRITE_LOG(log, priority, str) \ + if (log->FiltrationLevel() >= priority) \ + log->Write(priority, str); + +namespace NYdb::NConsoleClient { + const TString PRODUCER_PREFIX = "workload-producer"; + const TString CONSUMER_PREFIX = "workload-consumer"; + const TString MESSAGE_GROUP_ID_PREFIX = "workload-message-group-id"; + const TString TOPIC = "workload-topic"; +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp new file mode 100644 index 0000000000..441bd2f44b --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp @@ -0,0 +1,56 @@ +#include "topic_workload_init.h" + +#include "topic_workload_defines.h" +#include "topic_workload_params.h" + +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +using namespace NYdb::NConsoleClient; + +TCommandWorkloadTopicInit::TCommandWorkloadTopicInit() + : TWorkloadCommand("init", {}, "Create and initialize topic for workload") + , PartitionCount(1) +{ +} + +void TCommandWorkloadTopicInit::Config(TConfig& config) { + TYdbCommand::Config(config); + + config.SetFreeArgsNum(0); + + config.Opts->AddLongOption('p', "partitions", "Number of partitions in the topic.") + .DefaultValue(128) + .StoreResult(&PartitionCount); + config.Opts->AddLongOption('c', "consumers", "Number of consumers in the topic.") + .DefaultValue(1) + .StoreResult(&ConsumerCount); +} + +void TCommandWorkloadTopicInit::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandWorkloadTopicInit::Run(TConfig& config) { + Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config)); + auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*Driver); + auto topicName = config.Database + "/" + TOPIC; + + auto describeTopicResult = topicClient->DescribeTopic(topicName, {}).GetValueSync(); + if (describeTopicResult.GetTopicDescription().GetTotalPartitionsCount() != 0) { + Cout << "Topic " << topicName << " already exists.\n"; + return EXIT_FAILURE; + } + + NYdb::NTopic::TCreateTopicSettings settings; + for (ui32 consumerIdx = 0; consumerIdx < ConsumerCount; ++consumerIdx) { + settings.PartitioningSettings(PartitionCount, PartitionCount) + .BeginAddConsumer(TCommandWorkloadTopicParams::GenerateConsumerName(consumerIdx)) + .EndAddConsumer(); + } + + auto result = topicClient->CreateTopic(topicName, settings).GetValueSync(); + ThrowOnError(result); + + return EXIT_SUCCESS; +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h new file mode 100644 index 0000000000..ef72c07ab5 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h @@ -0,0 +1,19 @@ +#pragma once + +#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> + +namespace NYdb { + namespace NConsoleClient { + class TCommandWorkloadTopicInit: public TWorkloadCommand { + public: + TCommandWorkloadTopicInit(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + + private: + ui32 PartitionCount; + ui32 ConsumerCount; + }; + } +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp new file mode 100644 index 0000000000..b02a1cd883 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp @@ -0,0 +1,24 @@ +#include "topic_workload_params.h" + +#include "topic_workload_defines.h" + +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +using namespace NYdb::NConsoleClient; + +ui32 TCommandWorkloadTopicParams::StrToCodec(const TString& str) { + THashMap<TString, NYdb::NTopic::ECodec> codecs{ + {"raw", NYdb::NTopic::ECodec::RAW}, + {"gzip", NYdb::NTopic::ECodec::GZIP}, + {"zstd", NYdb::NTopic::ECodec::ZSTD}}; + TString loweredStr(str); + loweredStr.to_lower(); + codecs.contains(loweredStr) ?: throw yexception() << "Unsupported codec: " << str; + return (ui32)codecs[loweredStr]; +} + +TString TCommandWorkloadTopicParams::GenerateConsumerName(ui32 consumerIdx) +{ + TString consumerName = TStringBuilder() << CONSUMER_PREFIX << '-' << consumerIdx; + return consumerName; +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h new file mode 100644 index 0000000000..ddf7e5a355 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h @@ -0,0 +1,15 @@ +#pragma once + +#include <util/system/types.h> +#include <util/string/type.h> + +namespace NYdb { + namespace NConsoleClient { + class TCommandWorkloadTopicParams { + public: + static ui32 StrToCodec(const TString& str); + + static TString GenerateConsumerName(ui32 consumerIdx); + }; + } +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp new file mode 100644 index 0000000000..b47670691a --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp @@ -0,0 +1,87 @@ +#include "topic_workload_reader.h" + +#include "topic_workload_params.h" + +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> + +using namespace NYdb::NConsoleClient; + +void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams&& params) { + auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*params.Driver); + + NYdb::NTopic::TReadSessionSettings settings; + settings.ConsumerName(TCommandWorkloadTopicParams::GenerateConsumerName(params.ConsumerIdx)) + .AppendTopics(TOPIC); + + auto readSession = topicClient->CreateReadSession(settings); + WRITE_LOG(params.Log,ELogPriority::TLOG_INFO, "READER Session was created\n"); + + struct TPartitionStreamState { + ui64 StartOffset; + NYdb::NTopic::TPartitionSession::TPtr Stream; + }; + THashMap<std::pair<TString, ui64>, TPartitionStreamState> streamState; + + TInstant LastPartitionStatusRequestTime = TInstant::Zero(); + + (*params.StartedCount)++; + + const TInstant endTime = Now() + TDuration::Seconds(params.Seconds + 3); + + while (Now() < endTime && !*params.ErrorFlag) { + TInstant st = TInstant::Now(); + if (TInstant::Now() - LastPartitionStatusRequestTime > TDuration::Seconds(1)) { + for (auto& st : streamState) { + if (st.second.Stream) { + st.second.Stream->RequestStatus(); + } + } + LastPartitionStatusRequestTime = st; + } + + readSession->WaitEvent().Wait(TDuration::Seconds(1)); + TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = readSession->GetEvent(false); + if (!event) + continue; + + if (auto* dataEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) { + WRITE_LOG(params.Log,ELogPriority::TLOG_DEBUG, TStringBuilder() << dataEvent->DebugString() << "\n"); + for (const auto& message : dataEvent->GetMessages()) { + auto messageGroupId = message.GetMessageGroupId(); + auto stream = message.GetPartitionSession(); + auto topic = stream->GetTopicPath(); + auto partition = stream->GetPartitionId(); + ui64 fullTime = (TInstant::Now() - message.GetCreateTime()).MilliSeconds(); + + WRITE_LOG(params.Log,ELogPriority::TLOG_DEBUG, TStringBuilder() << "READER Got message: " << messageGroupId << " topic " << topic << " partition " << partition << " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo() << "\n"); + + params.StatsCollector->AddReaderEvent(message.GetData().Size(), fullTime); + } + dataEvent->Commit(); + } else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { + auto stream = createPartitionStreamEvent->GetPartitionSession(); + ui64 startOffset = streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].StartOffset; + streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].Stream = stream; + WRITE_LOG(params.Log,ELogPriority::TLOG_DEBUG, TStringBuilder() << "READER Starting read " << createPartitionStreamEvent->DebugString() << " from " << startOffset << "\n"); + createPartitionStreamEvent->Confirm(); + } else if (auto* destroyPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) { + auto stream = destroyPartitionStreamEvent->GetPartitionSession(); + streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].Stream = nullptr; + destroyPartitionStreamEvent->Confirm(); + } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) { + WRITE_LOG(params.Log,ELogPriority::TLOG_ERR, TStringBuilder() << "READER Session closed: '" << closeSessionEvent->DebugString() << "'\n"); + *params.ErrorFlag = 1; + break; + } else if (auto* partitionStreamStatusEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&*event)) { + WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << partitionStreamStatusEvent->DebugString() << "\n") + + ui64 lagMessages = partitionStreamStatusEvent->GetEndOffset() - partitionStreamStatusEvent->GetCommittedOffset(); + ui64 lagTime = lagMessages == 0 ? 0 : (Now() - partitionStreamStatusEvent->GetWriteTimeHighWatermark()).MilliSeconds(); + + params.StatsCollector->AddLagEvent(lagMessages, lagTime); + } else if (auto* ackEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) { + WRITE_LOG(params.Log,ELogPriority::TLOG_DEBUG, TStringBuilder() << ackEvent->DebugString() << "\n"); + } + } +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h new file mode 100644 index 0000000000..f46d9e8f5d --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h @@ -0,0 +1,29 @@ +#pragma once + +#include "topic_workload_defines.h" +#include "topic_workload_stats_collector.h" + +#include <library/cpp/logger/log.h> +#include <util/system/types.h> +#include <util/string/type.h> +#include <util/generic/size_literals.h> + +namespace NYdb { + namespace NConsoleClient { + struct TTopicWorkloadReaderParams { + size_t Seconds; + NYdb::TDriver* Driver; + std::shared_ptr<TLog> Log; + std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector; + std::shared_ptr<std::atomic_bool> ErrorFlag; + std::shared_ptr<std::atomic_uint> StartedCount; + + ui32 ConsumerIdx; + }; + + class TTopicWorkloadReader { + public: + static void ReaderLoop(TTopicWorkloadReaderParams&& params); + }; + } +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp new file mode 100644 index 0000000000..c6d5177ff2 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp @@ -0,0 +1,149 @@ +#include "topic_workload_run_full.h" + +#include "topic_workload_defines.h" +#include "topic_workload_params.h" +#include "topic_workload_reader.h" +#include "topic_workload_writer.h" + +#include <ydb/public/lib/ydb_cli/commands/ydb_service_topic.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <util/generic/guid.h> + +#include <sstream> +#include <future> +#include <thread> +#include <iomanip> + +using namespace NYdb::NConsoleClient; + +TCommandWorkloadTopicRunFull::TCommandWorkloadTopicRunFull() + : TWorkloadCommand("full", {}, "Full workload") + , ErrorFlag(std::make_shared<std::atomic_bool>()) +{ +} + +void TCommandWorkloadTopicRunFull::Config(TConfig& config) { + TYdbCommand::Config(config); + + config.SetFreeArgsNum(0); + + // Common params + config.Opts->AddLongOption('s', "seconds", "Seconds to run workload.") + .DefaultValue(10) + .StoreResult(&Seconds); + config.Opts->AddLongOption('w', "window", "Output window duration in seconds.") + .DefaultValue(1) + .StoreResult(&WindowDurationSec); + config.Opts->AddLongOption('q', "quiet", "Quiet mode. Doesn't print statistics each second.") + .StoreTrue(&Quiet); + config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.") + .StoreTrue(&PrintTimestamp); + + // Specific params + config.Opts->AddLongOption('p', "producer-threads", "Number of producer threads.") + .DefaultValue(1) + .StoreResult(&ProducerThreadCount); + config.Opts->AddLongOption('t', "consumer-threads", "Number of consumer threads.") + .DefaultValue(1) + .StoreResult(&ConsumerThreadCount); + config.Opts->AddLongOption('c', "consumers", "Number of consumers in a topic.") + .DefaultValue(1) + .StoreResult(&ConsumerCount); + config.Opts->AddLongOption('m', "message-size", "Message size.") + .DefaultValue(10_KB) + .StoreResult(&MessageSize); + config.Opts->AddLongOption("message-rate", "Total message rate for all producer threads (messages per second). Exclusive with --byte-rate.") + .DefaultValue(0) + .StoreResult(&MessageRate); + config.Opts->AddLongOption("byte-rate", "Total message rate for all producer threads (bytes per second). Exclusive with --message-rate.") + .DefaultValue(0) + .StoreResult(&ByteRate); + config.Opts->AddLongOption("codec", PrepareAllowedCodecsDescription("Client-side compression algorithm. When read, data will be uncompressed transparently with a codec used on write", InitAllowedCodecs())) + .Optional() + .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW)) + .StoreMappedResultT<TString>(&Codec, &TCommandWorkloadTopicParams::StrToCodec); + + config.Opts->MutuallyExclusive("message-rate", "byte-rate"); +} + +void TCommandWorkloadTopicRunFull::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandWorkloadTopicRunFull::Run(TConfig& config) { + Log = std::make_shared<TLog>(CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))); + Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)))); + + StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(true, true, Quiet, PrintTimestamp, WindowDurationSec, Seconds, ErrorFlag); + StatsCollector->PrintHeader(); + + std::vector<std::future<void>> threads; + + auto consumerStartedCount = std::make_shared<std::atomic_uint>(); + for (ui32 readerIdx = 0; readerIdx < ConsumerCount; ++readerIdx) { + for (ui32 readerThreadIdx = 0; readerThreadIdx < ConsumerThreadCount; ++readerThreadIdx) { + TTopicWorkloadReaderParams readerParams{ + .Seconds = Seconds, + .Driver = Driver.get(), + .Log = Log, + .StatsCollector = StatsCollector, + .ErrorFlag = ErrorFlag, + .StartedCount = consumerStartedCount, + + .ConsumerIdx = readerIdx}; + + threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::ReaderLoop(std::move(readerParams)); })); + } + } + while (*consumerStartedCount != ConsumerThreadCount * ConsumerCount) + Sleep(TDuration::MilliSeconds(10)); + + auto producerStartedCount = std::make_shared<std::atomic_uint>(); + for (ui32 writerIdx = 0; writerIdx < ProducerThreadCount; ++writerIdx) { + TTopicWorkloadWriterParams writerParams{ + .Seconds = Seconds, + .Driver = Driver.get(), + .Log = Log, + .StatsCollector = StatsCollector, + .ErrorFlag = ErrorFlag, + .StartedCount = producerStartedCount, + + .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate, + .ProducerThreadCount = ProducerThreadCount, + .MessageGroupId = TStringBuilder() << PRODUCER_PREFIX << "-" << CreateGuidAsString() << "-" << MESSAGE_GROUP_ID_PREFIX << '-' << writerIdx, + .MessageSize = MessageSize, + .Codec = Codec}; + + threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(std::move(writerParams)); })); + } + + while (*producerStartedCount != ProducerThreadCount) + Sleep(TDuration::MilliSeconds(10)); + + StatsCollector->PrintWindowStatsLoop(); + + for (auto& future : threads) { + future.wait(); + WRITE_LOG(Log, ELogPriority::TLOG_INFO, "All thread joined.\n"); + } + + StatsCollector->PrintTotalStats(); + + if (*ErrorFlag) { + WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "Problems occured while processing messages.\n"); + return EXIT_FAILURE; + } + + if (StatsCollector->GetTotalWriteMessages() == 0) { + WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "No messages were written.\n"); + return EXIT_FAILURE; + } + if (StatsCollector->GetTotalReadMessages() == 0) { + WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "No messages were read.\n"); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h new file mode 100644 index 0000000000..49c2be1e81 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h @@ -0,0 +1,36 @@ +#pragma once + +#include "topic_workload_stats_collector.h" + +#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> + +#include <library/cpp/logger/log.h> + +namespace NYdb { + namespace NConsoleClient { + class TCommandWorkloadTopicRunFull: public TWorkloadCommand { + public: + TCommandWorkloadTopicRunFull(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + + private: + size_t Seconds; + size_t MessageRate; + size_t ByteRate; + size_t MessageSize; + ui32 Codec; + + ui32 ProducerThreadCount; + ui32 ConsumerThreadCount; + ui32 ConsumerCount; + + std::shared_ptr<TLog> Log; + + std::shared_ptr<std::atomic_bool> ErrorFlag; + + std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector; + }; + } +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp new file mode 100644 index 0000000000..c1f1c46289 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp @@ -0,0 +1,102 @@ +#include "topic_workload_run_read.h" + +#include "topic_workload_defines.h" +#include "topic_workload_params.h" +#include "topic_workload_reader.h" + +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <util/generic/guid.h> + +#include <sstream> +#include <future> +#include <thread> +#include <iomanip> + +using namespace NYdb::NConsoleClient; + +TCommandWorkloadTopicRunRead::TCommandWorkloadTopicRunRead() + : TWorkloadCommand("read", {}, "Read workload") + , ErrorFlag(std::make_shared<std::atomic_bool>()) +{ +} + +void TCommandWorkloadTopicRunRead::Config(TConfig& config) { + TYdbCommand::Config(config); + + config.SetFreeArgsNum(0); + + // Common params + config.Opts->AddLongOption('s', "seconds", "Seconds to run workload.") + .DefaultValue(10) + .StoreResult(&Seconds); + config.Opts->AddLongOption('w', "window", "Output window duration in seconds.") + .DefaultValue(1) + .StoreResult(&WindowDurationSec); + config.Opts->AddLongOption('q', "quiet", "Quiet mode. Doesn't print statistics each second.") + .StoreTrue(&Quiet); + config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.") + .StoreTrue(&PrintTimestamp); + + // Specific params + config.Opts->AddLongOption('c', "consumers", "Number of consumers in a topic.") + .DefaultValue(1) + .StoreResult(&ConsumerCount); + config.Opts->AddLongOption('t', "threads", "Number of consumer threads.") + .DefaultValue(1) + .StoreResult(&ConsumerThreadCount); +} + +void TCommandWorkloadTopicRunRead::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandWorkloadTopicRunRead::Run(TConfig& config) { + Log = std::make_shared<TLog>(CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))); + Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)))); + + StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(false, true, Quiet, PrintTimestamp, WindowDurationSec, Seconds, ErrorFlag); + StatsCollector->PrintHeader(); + + std::vector<std::future<void>> threads; + + auto consumerStartedCount = std::make_shared<std::atomic_uint>(); + for (ui32 readerIdx = 0; readerIdx < ConsumerCount; ++readerIdx) { + for (ui32 readerThreadIdx = 0; readerThreadIdx < ConsumerThreadCount; ++readerThreadIdx) { + TTopicWorkloadReaderParams readerParams{ + .Seconds = Seconds, + .Driver = Driver.get(), + .Log = Log, + .StatsCollector = StatsCollector, + .ErrorFlag = ErrorFlag, + .StartedCount = consumerStartedCount, + + .ConsumerIdx = readerIdx}; + + threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::ReaderLoop(std::move(readerParams)); })); + } + } + while (*consumerStartedCount != ConsumerThreadCount * ConsumerCount) + Sleep(TDuration::MilliSeconds(10)); + + StatsCollector->PrintWindowStatsLoop(); + + for (auto& future : threads) { + future.wait(); + WRITE_LOG(Log,ELogPriority::TLOG_INFO, "All thread joined.\n"); + } + + StatsCollector->PrintTotalStats(); + + if (*ErrorFlag) { + WRITE_LOG(Log,ELogPriority::TLOG_EMERG, "Problems occured while reading messages.\n"); + return EXIT_FAILURE; + } + + if (StatsCollector->GetTotalReadMessages() == 0) { + WRITE_LOG(Log,ELogPriority::TLOG_EMERG, "No messages were read.\n"); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.h new file mode 100644 index 0000000000..bfe4ed1b1b --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.h @@ -0,0 +1,31 @@ +#pragma once + +#include "topic_workload_stats_collector.h" + +#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> + +#include <library/cpp/logger/log.h> + +namespace NYdb { + namespace NConsoleClient { + class TCommandWorkloadTopicRunRead: public TWorkloadCommand { + public: + TCommandWorkloadTopicRunRead(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + + private: + size_t Seconds; + + ui32 ConsumerThreadCount; + ui32 ConsumerCount; + + std::shared_ptr<TLog> Log; + + std::shared_ptr<std::atomic_bool> ErrorFlag; + + std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector; + }; + } +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp new file mode 100644 index 0000000000..22df051dfe --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp @@ -0,0 +1,117 @@ +#include "topic_workload_run_write.h" + +#include "topic_workload_defines.h" +#include "topic_workload_params.h" +#include "topic_workload_writer.h" + +#include <ydb/public/lib/ydb_cli/commands/ydb_service_topic.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <util/generic/guid.h> + +#include <sstream> +#include <future> +#include <thread> +#include <iomanip> + +using namespace NYdb::NConsoleClient; + +TCommandWorkloadTopicRunWrite::TCommandWorkloadTopicRunWrite() + : TWorkloadCommand("write", {}, "Write workload") + , ErrorFlag(std::make_shared<std::atomic_bool>()) +{ +} + +void TCommandWorkloadTopicRunWrite::Config(TConfig& config) { + TYdbCommand::Config(config); + + config.SetFreeArgsNum(0); + + // Common params + config.Opts->AddLongOption('s', "seconds", "Seconds to run workload.") + .DefaultValue(10) + .StoreResult(&Seconds); + config.Opts->AddLongOption('w', "window", "Output window duration in seconds.") + .DefaultValue(1) + .StoreResult(&WindowDurationSec); + config.Opts->AddLongOption('q', "quiet", "Quiet mode. Doesn't print statistics each second.") + .StoreTrue(&Quiet); + config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.") + .StoreTrue(&PrintTimestamp); + + // Specific params + config.Opts->AddLongOption('t', "threads", "Number of producer threads.") + .DefaultValue(1) + .StoreResult(&ProducerThreadCount); + config.Opts->AddLongOption('m', "message-size", "Message size.") + .DefaultValue(10_KB) + .StoreResult(&MessageSize); + config.Opts->AddLongOption("message-rate", "Total message rate for all producer threads (messages per second). Exclusive with --byte-rate.") + .DefaultValue(0) + .StoreResult(&MessageRate); + config.Opts->AddLongOption("byte-rate", "Total message rate for all producer threads (bytes per second). Exclusive with --message-rate.") + .DefaultValue(0) + .StoreResult(&ByteRate); + config.Opts->AddLongOption("codec", PrepareAllowedCodecsDescription("Client-side compression algorithm. When read, data will be uncompressed transparently with a codec used on write", InitAllowedCodecs())) + .Optional() + .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW)) + .StoreMappedResultT<TString>(&Codec, &TCommandWorkloadTopicParams::StrToCodec); + + config.Opts->MutuallyExclusive("message-rate", "byte-rate"); +} + +void TCommandWorkloadTopicRunWrite::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandWorkloadTopicRunWrite::Run(TConfig& config) { + Log = std::make_shared<TLog>(CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))); + Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)))); + StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(true, false, Quiet, PrintTimestamp, WindowDurationSec, Seconds, ErrorFlag); + StatsCollector->PrintHeader(); + + std::vector<std::future<void>> threads; + + auto producerStartedCount = std::make_shared<std::atomic_uint>(); + for (ui32 writerIdx = 0; writerIdx < ProducerThreadCount; ++writerIdx) { + TTopicWorkloadWriterParams writerParams{ + .Seconds = Seconds, + .Driver = Driver.get(), + .Log = Log, + .StatsCollector = StatsCollector, + .ErrorFlag = ErrorFlag, + .StartedCount = producerStartedCount, + + .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate, + .ProducerThreadCount = ProducerThreadCount, + .MessageGroupId = TStringBuilder() << PRODUCER_PREFIX << "-" << CreateGuidAsString() << "-" << MESSAGE_GROUP_ID_PREFIX << '-' << writerIdx, + .MessageSize = MessageSize, + .Codec = Codec}; + + threads.push_back(std::async([writerParams = std::move(writerParams)] () mutable { TTopicWorkloadWriterWorker::WriterLoop(std::move(writerParams)); })); + } + + while (*producerStartedCount != ProducerThreadCount) + Sleep(TDuration::MilliSeconds(10)); + + StatsCollector->PrintWindowStatsLoop(); + + for (auto& future : threads) { + future.wait(); + WRITE_LOG(Log, ELogPriority::TLOG_INFO, "All thread joined.\n"); + } + + StatsCollector->PrintTotalStats(); + + if (*ErrorFlag) { + WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "Problems occured while writing messages.\n") + return EXIT_FAILURE; + } + + if (StatsCollector->GetTotalWriteMessages() == 0) { + WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "No messages were written.\n"); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.h new file mode 100644 index 0000000000..b9f19d6692 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.h @@ -0,0 +1,33 @@ +#pragma once + +#include "topic_workload_stats_collector.h" + +#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> + +#include <library/cpp/logger/log.h> + +namespace NYdb { + namespace NConsoleClient { + class TCommandWorkloadTopicRunWrite: public TWorkloadCommand { + public: + TCommandWorkloadTopicRunWrite(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + + private: + size_t Seconds; + size_t MessageRate; + size_t ByteRate; + size_t MessageSize; + ui32 Codec; + + ui32 ProducerThreadCount; + + std::shared_ptr <TLog> Log; + std::shared_ptr<std::atomic_bool> ErrorFlag; + + std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector; + }; + } +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp new file mode 100644 index 0000000000..394cf2bfbe --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp @@ -0,0 +1,37 @@ +#include "topic_workload_stats.h" + +using namespace NYdb::NConsoleClient; + +TTopicWorkloadStats::TTopicWorkloadStats() + : WriteBytes(0) + , WriteMessages(0) + , WriteTimeHist(60000, 2) + , InflightMessages(0) + , LagMessages(0) + , LagTimeHist(60000, 2) + , ReadBytes(0) + , ReadMessages(0) + , FullTimeHist(60000, 5) +{ +} + +void TTopicWorkloadStats::AddWriterEvent(ui64 messageSize, ui64 writeTime, ui64 inflightMessages) +{ + WriteMessages++; + WriteBytes += messageSize; + WriteTimeHist.RecordValue(writeTime); + InflightMessages = Max(InflightMessages, inflightMessages); +} + +void TTopicWorkloadStats::AddReaderEvent(ui64 messageSize, ui64 fullTime) +{ + ReadMessages++; + ReadBytes += messageSize; + FullTimeHist.RecordValue(fullTime); +} + +void TTopicWorkloadStats::AddLagEvent(ui64 lagMessages, ui64 lagTime) +{ + LagMessages = Max(LagMessages, lagMessages); + LagTimeHist.RecordValue(lagTime); +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h new file mode 100644 index 0000000000..f76cdd4b9f --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h @@ -0,0 +1,26 @@ +#pragma once + +#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> + +namespace NYdb { + namespace NConsoleClient { + class TTopicWorkloadStats { + public: + TTopicWorkloadStats(); + + void AddWriterEvent(ui64 messageSize, ui64 writeTime, ui64 inflightMessages); + void AddReaderEvent(ui64 messageSize, ui64 fullTime); + void AddLagEvent(ui64 lagMessages, ui64 lagTime); + + ui64 WriteBytes; + ui64 WriteMessages; + NHdr::THistogram WriteTimeHist; + ui64 InflightMessages; + ui64 LagMessages; + NHdr::THistogram LagTimeHist; + ui64 ReadBytes; + ui64 ReadMessages; + NHdr::THistogram FullTimeHist; + }; + } +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp new file mode 100644 index 0000000000..63d45a4439 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp @@ -0,0 +1,121 @@ +#include "topic_workload_stats_collector.h" + +#include "topic_workload_defines.h" + +using namespace NYdb::NConsoleClient; + +TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector( + bool producer, bool consumer, + bool quiet, bool printTimestamp, + ui32 windowDurationSec, ui32 totalDurationSec, + std::shared_ptr<std::atomic_bool> errorFlag) + : Producer(producer) + , Consumer(consumer) + , Quiet(quiet) + , PrintTimestamp(printTimestamp) + , WindowDurationSec(windowDurationSec) + , TotalDurationSec(totalDurationSec) + , ErrorFlag(errorFlag) + , WindowStats(MakeHolder<TTopicWorkloadStats>()) +{ +} + +void TTopicWorkloadStatsCollector::PrintHeader() const { + if (Quiet) + return; + + Cout << "Window\t" << (Producer ? "Write speed\tWrite time\tInflight\t" : "") << (Consumer ? "Lag\tLag time\tRead speed\tFull time\t" : "") << (PrintTimestamp ? "Timestamp" : "") << Endl; + Cout << "#\t" << (Producer ? "msg/s\tMB/s\tP99(ms)\t\tmax msg\t\t" : "") << (Consumer ? "max msg\tP99(ms)\t\tmsg/s\tMB/s\tP99(ms)" : ""); + Cout << Endl; +} + +void TTopicWorkloadStatsCollector::PrintWindowStatsLoop() { + auto StartTime = Now(); + auto StopTime = StartTime + TDuration::Seconds(TotalDurationSec + 1); + int windowIt = 1; + auto windowDuration = TDuration::Seconds(WindowDurationSec); + while (Now() < StopTime && !*ErrorFlag) { + if (Now() > StartTime + windowIt * windowDuration && !*ErrorFlag) { + PrintWindowStats(windowIt++); + } + Sleep(std::max(TDuration::Zero(), Now() - StartTime - windowIt * windowDuration)); + } +} + +void TTopicWorkloadStatsCollector::PrintWindowStats(ui32 windowIt) { + PrintStats(windowIt); + + with_lock (Lock) { + WindowStats = MakeHolder<TTopicWorkloadStats>(); + } +} +void TTopicWorkloadStatsCollector::PrintTotalStats() const { + PrintHeader(); + PrintStats({}); +} + +void TTopicWorkloadStatsCollector::PrintStats(TMaybe<ui32> windowIt) const { + if (Quiet) + return; + + with_lock (Lock) { + const auto& stats = windowIt.Empty() ? TotalStats : *WindowStats; + double seconds = windowIt.Empty() ? TotalDurationSec : WindowDurationSec; + TString totalIt = windowIt.Empty() ? "Total" : std::to_string(windowIt.GetRef()); + + Cout << totalIt; + if (Producer) { + Cout << "\t" << (int)(stats.WriteMessages / seconds) + << "\t" << (int)(stats.WriteBytes / seconds / 1024 / 1024) + << "\t" << stats.WriteTimeHist.GetValueAtPercentile(99) << "\t" + << "\t" << stats.InflightMessages << "\t"; + } + if (Consumer) { + Cout << "\t" << stats.LagMessages + << "\t" << stats.LagTimeHist.GetValueAtPercentile(99) << "\t" + << "\t" << (int)(stats.ReadMessages / seconds) + << "\t" << (int)(stats.ReadBytes / seconds / 1024 / 1024) + << "\t" << stats.FullTimeHist.GetValueAtPercentile(99) << "\t"; + } + if (PrintTimestamp) { + Cout << "\t" << Now().ToStringUpToSeconds(); + } + Cout << Endl; + } +} + +ui64 TTopicWorkloadStatsCollector::GetTotalReadMessages() const { + with_lock (Lock) { + return TotalStats.ReadMessages; + } +} + +ui64 TTopicWorkloadStatsCollector::GetTotalWriteMessages() const { + with_lock (Lock) { + return TotalStats.WriteMessages; + } +} + +void TTopicWorkloadStatsCollector::AddWriterEvent(ui64 messageSize, ui64 writeTime, ui64 inflightMessages) +{ + with_lock (Lock) { + WindowStats->AddWriterEvent(messageSize, writeTime, inflightMessages); + TotalStats.AddWriterEvent(messageSize, writeTime, inflightMessages); + } +} + +void TTopicWorkloadStatsCollector::AddReaderEvent(ui64 messageSize, ui64 fullTime) +{ + with_lock (Lock) { + WindowStats->AddReaderEvent(messageSize, fullTime); + TotalStats.AddReaderEvent(messageSize, fullTime); + } +} + +void TTopicWorkloadStatsCollector::AddLagEvent(ui64 lagMessages, ui64 lagTime) +{ + with_lock (Lock) { + WindowStats->AddLagEvent(lagMessages, lagTime); + TotalStats.AddLagEvent(lagMessages, lagTime); + } +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h new file mode 100644 index 0000000000..7fae55204a --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h @@ -0,0 +1,49 @@ +#pragma once + +#include "topic_workload_stats.h" + +#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> + +namespace NYdb { + namespace NConsoleClient { + class TTopicWorkloadStatsCollector { + public: + TTopicWorkloadStatsCollector( + bool producer, bool consumer, + bool quiet, bool printTimestamp, + ui32 windowDurationSec, ui32 totalDurationSec, + std::shared_ptr<std::atomic_bool> errorFlag); + + void PrintWindowStatsLoop(); + + void PrintHeader() const; + void PrintTotalStats() const; + + void AddWriterEvent(ui64 messageSize, ui64 writeTime, ui64 inflightMessages); + void AddReaderEvent(ui64 messageSize, ui64 fullTime); + void AddLagEvent(ui64 lagMessages, ui64 lagTime); + + ui64 GetTotalReadMessages() const; + ui64 GetTotalWriteMessages() const; + + private: + void PrintWindowStats(ui32 windowIt); + void PrintStats(TMaybe<ui32> windowIt) const; + + bool Producer; + bool Consumer; + + bool Quiet; + bool PrintTimestamp; + + double WindowDurationSec; + double TotalDurationSec; + + TSpinLock Lock; + std::shared_ptr<std::atomic_bool> ErrorFlag; + + THolder<TTopicWorkloadStats> WindowStats; + TTopicWorkloadStats TotalStats; + }; + } +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp new file mode 100644 index 0000000000..32d455a936 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp @@ -0,0 +1,219 @@ +#include "topic_workload_writer.h" + +#include <util/generic/overloaded.h> + +using namespace NYdb::NConsoleClient; + +TTopicWorkloadWriterWorker::TTopicWorkloadWriterWorker( + TTopicWorkloadWriterParams&& params) + : Params(params) + , MessageId(1) + , StartTimestamp(TInstant::Now()) + , StatsCollector(params.StatsCollector) + +{ + Closed = std::make_shared<std::atomic<bool>>(false); + GenerateMessages(); + CreateWorker(); +} + +TTopicWorkloadWriterWorker::~TTopicWorkloadWriterWorker() +{ + if (WriteSession) + WriteSession->Close(); +} + +void TTopicWorkloadWriterWorker::CreateWorker() { + if (!InitSeqNoProcessed) { + *Params.ErrorFlag = 1; + WRITE_LOG(Params.Log, ELogPriority::TLOG_WARNING, "WRITER no progress while writing by protocol.\n"); + return; + } + + MessageGroupId = Params.MessageGroupId; + + WRITE_LOG(Params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "WRITER create worker for " << MessageGroupId << ".\n"); + + if (WriteSession) + WriteSession->Close(); + + CreateTopicWorker(); +} + +void TTopicWorkloadWriterWorker::Close() { + Closed->store(true); + if (WriteSession) + WriteSession->Close(TDuration::Zero()); +} + +const size_t GENERATED_MESSAGES_COUNT = 32; + +void TTopicWorkloadWriterWorker::GenerateMessages() { + TStringBuilder res; + for (size_t i = 0; i < GENERATED_MESSAGES_COUNT; i++) { + res.clear(); + while (res.Size() < Params.MessageSize) + res << RandomNumber<ui64>(UINT64_MAX); + GeneratedMessages.push_back(res); + } +} + +TString TTopicWorkloadWriterWorker::GetGeneratedMessage() const { + return GeneratedMessages[MessageId % GENERATED_MESSAGES_COUNT]; +} + +TDuration TTopicWorkloadWriterWorker::Process() { + if (!InitSeqNoProcessed) { + if (!InitSeqNo.HasValue() && !InitSeqNo.Wait(TDuration::Seconds(1))) { + WRITE_LOG(Params.Log, ELogPriority::TLOG_WARNING, "WRITER no initial sequence number\n"); + return TDuration::Seconds(1); + } + if (InitSeqNo.HasException()) { + try { + InitSeqNo.GetValue(); + } catch (yexception e) { + WRITE_LOG(Params.Log, ELogPriority::TLOG_ERR, TStringBuilder() << "Future exception: " << e.what() << "\n"); + } + *Params.ErrorFlag = 1; + return TDuration::Zero(); + } + + InitSeqNoProcessed = true; + WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "WRITER sequence number initialized " << InitSeqNo.GetValue() << "\n"); + if (MessageId != InitSeqNo.GetValue() + 1) { + MessageId = InitSeqNo.GetValue() + 1; + AckedMessageId = MessageId - 1; + } + } + + ui64 elapsedSeconds = (Now() - StartTimestamp).Seconds(); + const ui64 bytesMustBeWritten = Params.ByteRate == 0 ? UINT64_MAX : elapsedSeconds * Params.ByteRate / Params.ProducerThreadCount; + + while (true) { + auto events = WriteSession->GetEvents(false); + WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "WRITER Got " << events.size() << " events \n"); + + for (auto& e : events) + ProcessEvent(e); + + if (BytesWritten < bytesMustBeWritten && ContinuationToken.Defined()) { + WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "WRITER Writing message " << MessageId << "\n"); + + TString data = GetGeneratedMessage(); + size_t messageSize = data.size(); + + TMaybe<TInstant> createTimestamp = + Params.ByteRate == 0 ? TMaybe<TInstant>(Nothing()) + : StartTimestamp + TDuration::Seconds((double)BytesWritten / Params.ByteRate * Params.ProducerThreadCount); + + InflightMessages[MessageId] = {messageSize, createTimestamp.GetOrElse(Now())}; + + BytesWritten += messageSize; + + WriteSession->Write(std::move(ContinuationToken.GetRef()), data, MessageId++, createTimestamp); + ContinuationToken.Clear(); + } + + if (events.empty()) + break; + } + + const TDuration timeToNextMessage = TDuration::MilliSeconds(50); + return timeToNextMessage; +} + +bool TTopicWorkloadWriterWorker::ProcessEvent( + NYdb::NTopic::TWriteSessionEvent::TEvent& event) { + return std::visit( + TOverloaded{ + [this](const NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) { + return ProcessAckEvent(event); + }, + [this](NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) { + return ProcessReadyToAcceptEvent(event); + }, + [this](const NYdb::NTopic::TSessionClosedEvent& event) { + return ProcessSessionClosedEvent(event); + }}, event); +}; + +bool TTopicWorkloadWriterWorker::ProcessAckEvent( + const NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) { + bool hasProgress = false; + //! Acks just confirm that message was received and saved by server + //! successfully. Here we just count acked messages to check, that everything + //! written is confirmed. + for (const auto& ack : event.Acks) { + WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "WRITER Got ack for write " << AckedMessageId << "\n"); + AckedMessageId = ack.SeqNo; + + auto inflightMessageIter = InflightMessages.find(AckedMessageId); + if (inflightMessageIter == InflightMessages.end()) + { + *Params.ErrorFlag = 1; + WRITE_LOG(Params.Log, ELogPriority::TLOG_ERR, TStringBuilder() << "WRITER Unknown AckedMessageId " << AckedMessageId << "\n"); + return false; + } + + ui64 writeTime = (Now() - inflightMessageIter->second.MessageTime).MilliSeconds(); + ui64 messageSize = inflightMessageIter->second.MessageSize; + InflightMessages.erase(inflightMessageIter); + + StatsCollector->AddWriterEvent(messageSize, writeTime, InflightMessages.size()); + + hasProgress = true; + } + return hasProgress; +} + +bool TTopicWorkloadWriterWorker::ProcessReadyToAcceptEvent( + NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) { + //! TReadyToAcceptEvent provide continue tokens - an object to perform further + //! writes. + //! Do NOT lose continue tokens! + + ContinuationToken = std::move(event.ContinuationToken); + + WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, "WRITER Got new continue token\n"); + + return true; +} + +bool TTopicWorkloadWriterWorker::ProcessSessionClosedEvent( + const NYdb::NTopic::TSessionClosedEvent& event) { + WRITE_LOG(Params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "WRITER Got close event: " << event.DebugString() << "\n"); + //! Session is closed, stop any work with it. + Y_FAIL("session closed"); + return false; +} + +void TTopicWorkloadWriterWorker::CreateTopicWorker() { + WRITE_LOG(Params.Log, ELogPriority::TLOG_INFO, "WRITER Creating worker\n"); + Y_VERIFY(Params.Driver); + NYdb::NTopic::TWriteSessionSettings settings; + settings.Codec((NYdb::NTopic::ECodec)Params.Codec); + settings.Path(TOPIC); + settings.ProducerId(MessageGroupId).MessageGroupId(MessageGroupId); + WriteSession = NYdb::NTopic::TTopicClient(*Params.Driver).CreateWriteSession(settings); + InitSeqNo = WriteSession->GetInitSeqNo(); + InitSeqNoProcessed = false; +} + +void TTopicWorkloadWriterWorker::WriterLoop(TTopicWorkloadWriterParams&& params) { + TTopicWorkloadWriterWorker writer(std::move(params)); + + (*params.StartedCount)++; + + WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "WRITER started " << Now().ToStringUpToSeconds() << "\n"); + + auto endTime = TInstant::Now() + TDuration::Seconds(params.Seconds); + while (Now() < endTime && !*params.ErrorFlag) { + TDuration delta = TDuration::MilliSeconds(100); + auto d = writer.Process(); + delta = Min(d, delta); + Sleep(delta); + } + + + WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "WRITER finished " << Now().ToStringUpToSeconds() << "\n"); +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h new file mode 100644 index 0000000000..70a7b461c8 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h @@ -0,0 +1,81 @@ +#pragma once + +#include "topic_workload_defines.h" +#include "topic_workload_stats_collector.h" + +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <library/cpp/logger/log.h> +#include <util/generic/string.h> + +namespace NYdb { + namespace NConsoleClient { + + struct TTopicWorkloadWriterParams { + size_t Seconds; + NYdb::TDriver* Driver; + std::shared_ptr<TLog> Log; + std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector; + std::shared_ptr<std::atomic<bool>> ErrorFlag; + std::shared_ptr<std::atomic_uint> StartedCount; + + size_t ByteRate; + ui32 ProducerThreadCount; + TString MessageGroupId; + size_t MessageSize; + ui32 Codec = 0; + }; + + class TTopicWorkloadWriterWorker { + public: + TTopicWorkloadWriterWorker(TTopicWorkloadWriterParams&& params); + ~TTopicWorkloadWriterWorker(); + + void Close(); + + TDuration Process(); + + void CreateWorker(); + + void CreateTopicWorker(); + + static void WriterLoop(TTopicWorkloadWriterParams&& params); + + private: + bool ProcessAckEvent(const NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event); + + bool ProcessReadyToAcceptEvent(NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event); + bool ProcessSessionClosedEvent(const NYdb::NTopic::TSessionClosedEvent& event); + + bool ProcessEvent(NYdb::NTopic::TWriteSessionEvent::TEvent& event); + + TString GetGeneratedMessage() const; + void GenerateMessages(); + + + + TTopicWorkloadWriterParams Params; + ui64 MessageId = 0; + ui64 AckedMessageId = 0; + ui64 BytesWritten = 0; + TString MessageGroupId; + std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession; + TInstant StartTimestamp; + + std::vector<TString> GeneratedMessages; + + NThreading::TFuture<ui64> InitSeqNo; + bool InitSeqNoProcessed = true; + TMaybe<NTopic::TContinuationToken> ContinuationToken; + + std::shared_ptr<std::atomic<bool>> Closed; + std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector; + + struct TInflightMessage { + size_t MessageSize; + TInstant MessageTime; + }; + THashMap<ui64, TInflightMessage> InflightMessages; + }; + } +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index ea0b2d8584..ba67085b69 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -77,24 +77,9 @@ namespace NYdb::NConsoleClient { bool IsStreamingFormat(EMessagingFormat format) { return format == EMessagingFormat::NewlineDelimited || format == EMessagingFormat::Concatenated; } - - ELogPriority VerbosityLevelToELogPriority(TClientCommand::TConfig::EVerbosityLevel lvl) { - switch (lvl) { - case TClientCommand::TConfig::EVerbosityLevel::NONE: - return ELogPriority::TLOG_EMERG; - case TClientCommand::TConfig::EVerbosityLevel::DEBUG: - return ELogPriority::TLOG_DEBUG; - case TClientCommand::TConfig::EVerbosityLevel::INFO: - return ELogPriority::TLOG_INFO; - case TClientCommand::TConfig::EVerbosityLevel::WARN: - return ELogPriority::TLOG_WARNING; - default: - return ELogPriority::TLOG_EMERG; - } - } } // namespace - namespace { + TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs) { TStringStream description; description << descriptionPrefix << ". Available codecs: "; @@ -109,20 +94,21 @@ namespace NYdb::NConsoleClient { return description.Str(); } + +namespace { + NTopic::ECodec ParseCodec(const TString& codecStr, const TVector<NTopic::ECodec>& allowedCodecs) { + auto exists = ExistingCodecs.find(to_lower(codecStr)); + if (exists == ExistingCodecs.end()) { + throw TMisuseException() << "Codec " << codecStr << " is not available for this command"; + } - NTopic::ECodec ParseCodec(const TString& codecStr, const TVector<NTopic::ECodec>& allowedCodecs) { - auto exists = ExistingCodecs.find(to_lower(codecStr)); - if (exists == ExistingCodecs.end()) { - throw TMisuseException() << "Codec " << codecStr << " is not available for this command"; - } + if (std::find(allowedCodecs.begin(), allowedCodecs.end(), exists->second) == allowedCodecs.end()) { + throw TMisuseException() << "Codec " << codecStr << " is not available for this command"; + } - if (std::find(allowedCodecs.begin(), allowedCodecs.end(), exists->second) == allowedCodecs.end()) { - throw TMisuseException() << "Codec " << codecStr << " is not available for this command"; + return exists->second; } - - return exists->second; } - } void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) { TString description = PrepareAllowedCodecsDescription("Comma-separated list of supported codecs", supportedCodecs); @@ -655,7 +641,7 @@ namespace NYdb::NConsoleClient { ValidateConfig(); auto driver = - std::make_unique<TDriver>(CreateDriver(config, CreateLogBackend("cerr", VerbosityLevelToELogPriority(config.VerbosityLevel)))); + std::make_unique<TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)))); NTopic::TTopicClient topicClient(*driver); auto readSession = topicClient.CreateReadSession(PrepareReadSessionSettings()); @@ -783,7 +769,7 @@ namespace NYdb::NConsoleClient { SetInterruptHandlers(); auto driver = - std::make_unique<TDriver>(CreateDriver(config, CreateLogBackend("cerr", VerbosityLevelToELogPriority(config.VerbosityLevel)))); + std::make_unique<TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)))); NTopic::TTopicClient topicClient(*driver); { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 5daa65a4a5..ca8af79c62 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -10,6 +10,7 @@ #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> namespace NYdb::NConsoleClient { + TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs); TVector<NYdb::NTopic::ECodec> InitAllowedCodecs(); const TVector<NYdb::NTopic::ECodec> AllowedCodecs = InitAllowedCodecs(); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index c7b2744d65..8e710f497d 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -3,6 +3,7 @@ #include "stock_workload.h" #include "kv_workload.h" #include "click_bench.h" +#include "topic_workload/topic_workload.h" #include <ydb/library/workload/workload_factory.h> #include <ydb/public/lib/ydb_cli/commands/ydb_common.h> @@ -38,6 +39,7 @@ TCommandWorkload::TCommandWorkload() AddCommand(std::make_unique<TCommandStock>()); AddCommand(std::make_unique<TCommandKv>()); AddCommand(std::make_unique<TCommandClickBench>()); + AddCommand(std::make_unique<TCommandWorkloadTopic>()); } TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_list<TString>& aliases, const TString& description) diff --git a/ydb/public/lib/ydb_cli/common/command.cpp b/ydb/public/lib/ydb_cli/common/command.cpp index 13d624b651..250089e4f9 100644 --- a/ydb/public/lib/ydb_cli/common/command.cpp +++ b/ydb/public/lib/ydb_cli/common/command.cpp @@ -98,6 +98,21 @@ TClientCommand::TClientCommand(const TString& name, const std::initializer_list< Opts.SetWrap(Max(Opts.Wrap_, static_cast<ui32>(TermWidth()))); } +ELogPriority TClientCommand::TConfig::VerbosityLevelToELogPriority(TClientCommand::TConfig::EVerbosityLevel lvl) { + switch (lvl) { + case TClientCommand::TConfig::EVerbosityLevel::NONE: + return ELogPriority::TLOG_EMERG; + case TClientCommand::TConfig::EVerbosityLevel::DEBUG: + return ELogPriority::TLOG_DEBUG; + case TClientCommand::TConfig::EVerbosityLevel::INFO: + return ELogPriority::TLOG_INFO; + case TClientCommand::TConfig::EVerbosityLevel::WARN: + return ELogPriority::TLOG_WARNING; + default: + return ELogPriority::TLOG_ERR; + } +} + TClientCommand::TOptsParseOneLevelResult::TOptsParseOneLevelResult(TConfig& config) { int _argc = 1; int levels = 1; diff --git a/ydb/public/lib/ydb_cli/common/command.h b/ydb/public/lib/ydb_cli/common/command.h index c3c02ba86c..4ce1aad157 100644 --- a/ydb/public/lib/ydb_cli/common/command.h +++ b/ydb/public/lib/ydb_cli/common/command.h @@ -6,6 +6,7 @@ #include <library/cpp/getopt/last_getopt.h> #include <library/cpp/colorizer/colors.h> +#include <library/cpp/logger/priority.h> #include <util/generic/strbuf.h> #include <util/generic/vector.h> #include <util/charset/utf8.h> @@ -70,6 +71,8 @@ public: DEBUG = 3, }; + static ELogPriority VerbosityLevelToELogPriority(EVerbosityLevel lvl); + int ArgC; char** ArgV; int InitialArgC; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 4d9d519224..b2d24a177c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -258,7 +258,7 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain std::function<void(bool)> connectTimeoutCallback; if (!status.Ok()) { - LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Got error. Status: " << status.Status + LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues)); } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp index 083ec713c6..76d782a954 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -98,7 +98,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); return result; } - LOG_LAZY(DbDriverState->Log, TLOG_INFO, + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues) ); @@ -119,7 +119,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat ResetForRetryImpl(); } else { - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session will not restart after a fatal error"); result.DoStop = true; CheckHandleResultImpl(result); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 992eabbc38..3ffa76eab6 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -98,7 +98,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat return result; } LOG_LAZY(DbDriverState->Log, - TLOG_INFO, + TLOG_ERR, LogPrefix() << "Got error. Status: " << status.Status << ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues) ); @@ -119,7 +119,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat ResetForRetryImpl(); } else { - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session will not restart after a fatal error"); result.DoStop = true; CheckHandleResultImpl(result); } |