aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-04-05 17:22:35 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-04-05 17:22:35 +0300
commit54ad0af8cfe99687f95779e2d97d3e6ad6b350d4 (patch)
tree15803c7c3914e85fc5033a5426abb55d50bf4516
parentee97871cc4cea65efe6f12af3599773ed6a7833a (diff)
downloadydb-54ad0af8cfe99687f95779e2d97d3e6ad6b350d4.tar.gz
workload topic
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt41
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt42
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt42
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.txt17
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt41
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp26
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.h13
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp39
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h22
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h14
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp56
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h19
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp24
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h15
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp87
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h29
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp149
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h36
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp102
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.h31
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp117
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.h33
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp37
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h26
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp121
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h49
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp219
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h81
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp42
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.h1
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.cpp2
-rw-r--r--ydb/public/lib/ydb_cli/common/command.cpp15
-rw-r--r--ydb/public/lib/ydb_cli/common/command.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp4
40 files changed, 1580 insertions, 33 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
index 0a0a67946b..05899f2c44 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(topic_workload)
get_built_tool_path(
TOOL_rescompiler_bin
TOOL_rescompiler_dependency
@@ -25,6 +26,7 @@ target_link_libraries(clicommands PUBLIC
ydb-library-workload
public-lib-operation_id
common
+ topic_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
@@ -79,6 +81,7 @@ target_link_libraries(clicommands.global PUBLIC
ydb-library-workload
public-lib-operation_id
common
+ topic_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
index 9ea855179b..a3cb431493 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(topic_workload)
get_built_tool_path(
TOOL_rescompiler_bin
TOOL_rescompiler_dependency
@@ -26,6 +27,7 @@ target_link_libraries(clicommands PUBLIC
ydb-library-workload
public-lib-operation_id
common
+ topic_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
@@ -81,6 +83,7 @@ target_link_libraries(clicommands.global PUBLIC
ydb-library-workload
public-lib-operation_id
common
+ topic_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
index 9ea855179b..a3cb431493 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(topic_workload)
get_built_tool_path(
TOOL_rescompiler_bin
TOOL_rescompiler_dependency
@@ -26,6 +27,7 @@ target_link_libraries(clicommands PUBLIC
ydb-library-workload
public-lib-operation_id
common
+ topic_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
@@ -81,6 +83,7 @@ target_link_libraries(clicommands.global PUBLIC
ydb-library-workload
public-lib-operation_id
common
+ topic_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
index 0a0a67946b..05899f2c44 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(topic_workload)
get_built_tool_path(
TOOL_rescompiler_bin
TOOL_rescompiler_dependency
@@ -25,6 +26,7 @@ target_link_libraries(clicommands PUBLIC
ydb-library-workload
public-lib-operation_id
common
+ topic_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
@@ -79,6 +81,7 @@ target_link_libraries(clicommands.global PUBLIC
ydb-library-workload
public-lib-operation_id
common
+ topic_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..ce4312b20f
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,41 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(topic_workload)
+target_link_libraries(topic_workload PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ yql-public-issue
+ public-issue-protos
+ api-grpc
+ api-protos
+ api-protos-annotations
+ public-lib-operation_id
+ lib-operation_id-protos
+ cpp-client-draft
+ cpp-client-ydb_driver
+ cpp-client-ydb_proto
+ cpp-client-ydb_table
+ cpp-client-ydb_topic
+ client-ydb_types-operation
+ client-ydb_types-status
+)
+target_sources(topic_workload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
+)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..1499a10225
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,42 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(topic_workload)
+target_link_libraries(topic_workload PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ yql-public-issue
+ public-issue-protos
+ api-grpc
+ api-protos
+ api-protos-annotations
+ public-lib-operation_id
+ lib-operation_id-protos
+ cpp-client-draft
+ cpp-client-ydb_driver
+ cpp-client-ydb_proto
+ cpp-client-ydb_table
+ cpp-client-ydb_topic
+ client-ydb_types-operation
+ client-ydb_types-status
+)
+target_sources(topic_workload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
+)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..1499a10225
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,42 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(topic_workload)
+target_link_libraries(topic_workload PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ yql-public-issue
+ public-issue-protos
+ api-grpc
+ api-protos
+ api-protos-annotations
+ public-lib-operation_id
+ lib-operation_id-protos
+ cpp-client-draft
+ cpp-client-ydb_driver
+ cpp-client-ydb_proto
+ cpp-client-ydb_table
+ cpp-client-ydb_topic
+ client-ydb_types-operation
+ client-ydb_types-status
+)
+target_sources(topic_workload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
+)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.txt
new file mode 100644
index 0000000000..a692f90f36
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64")
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..ce4312b20f
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,41 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(topic_workload)
+target_link_libraries(topic_workload PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ yql-public-issue
+ public-issue-protos
+ api-grpc
+ api-protos
+ api-protos-annotations
+ public-lib-operation_id
+ lib-operation_id-protos
+ cpp-client-draft
+ cpp-client-ydb_driver
+ cpp-client-ydb_proto
+ cpp-client-ydb_table
+ cpp-client-ydb_topic
+ client-ydb_types-operation
+ client-ydb_types-status
+)
+target_sources(topic_workload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
+)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
new file mode 100644
index 0000000000..765da9f9c3
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
@@ -0,0 +1,26 @@
+#include "topic_workload.h"
+
+#include "topic_workload_defines.h"
+#include "topic_workload_clean.h"
+#include "topic_workload_init.h"
+#include "topic_workload_run_read.h"
+#include "topic_workload_run_write.h"
+#include "topic_workload_run_full.h"
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTopic::TCommandWorkloadTopic()
+ : TClientCommandTree("topic", {}, "YDB topic workload")
+{
+ AddCommand(std::make_unique<TCommandWorkloadTopicInit>());
+ AddCommand(std::make_unique<TCommandWorkloadTopicClean>());
+ AddCommand(std::make_unique<TCommandWorkloadTopicRun>());
+}
+
+TCommandWorkloadTopicRun::TCommandWorkloadTopicRun()
+ : TClientCommandTree("run", {}, "Run YDB topic workload")
+{
+ AddCommand(std::make_unique<TCommandWorkloadTopicRunRead>());
+ AddCommand(std::make_unique<TCommandWorkloadTopicRunWrite>());
+ AddCommand(std::make_unique<TCommandWorkloadTopicRunFull>());
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.h
new file mode 100644
index 0000000000..ff966aff42
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/common/command.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+
+ class TCommandWorkloadTopic: public TClientCommandTree {
+ public:
+ TCommandWorkloadTopic();
+ };
+ }
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
new file mode 100644
index 0000000000..29d85b6726
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
@@ -0,0 +1,39 @@
+#include "topic_workload_clean.h"
+
+#include "topic_workload_defines.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTopicClean::TCommandWorkloadTopicClean()
+ : TWorkloadCommand("clean", {}, "drop topic created in init phase")
+{
+}
+
+void TCommandWorkloadTopicClean::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+ config.SetFreeArgsNum(0);
+}
+
+void TCommandWorkloadTopicClean::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandWorkloadTopicClean::Run(TConfig& config) {
+ Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config));
+ auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*Driver);
+
+ auto topicName = config.Database + "/" + TOPIC;
+ auto describeTopicResult = topicClient->DescribeTopic(topicName, {}).GetValueSync();
+ if (describeTopicResult.GetTopicDescription().GetTotalPartitionsCount() == 0) {
+ Cout << "Topic " << topicName << " does not exists.\n";
+ return EXIT_FAILURE;
+ }
+
+ auto result = topicClient->DropTopic(config.Database + "/" + TOPIC).GetValueSync();
+ ThrowOnError(result);
+ return EXIT_SUCCESS;
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h
new file mode 100644
index 0000000000..f1ba7a3dcc
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ class TCommandWorkloadTopicClean: public TWorkloadCommand {
+ public:
+ TCommandWorkloadTopicClean();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+ };
+
+ class TCommandWorkloadTopicRun: public TClientCommandTree {
+ public:
+ TCommandWorkloadTopicRun();
+
+ private:
+ };
+ }
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h
new file mode 100644
index 0000000000..3b9c05a1f5
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include <util/generic/string.h>
+
+#define WRITE_LOG(log, priority, str) \
+ if (log->FiltrationLevel() >= priority) \
+ log->Write(priority, str);
+
+namespace NYdb::NConsoleClient {
+ const TString PRODUCER_PREFIX = "workload-producer";
+ const TString CONSUMER_PREFIX = "workload-consumer";
+ const TString MESSAGE_GROUP_ID_PREFIX = "workload-message-group-id";
+ const TString TOPIC = "workload-topic";
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
new file mode 100644
index 0000000000..441bd2f44b
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
@@ -0,0 +1,56 @@
+#include "topic_workload_init.h"
+
+#include "topic_workload_defines.h"
+#include "topic_workload_params.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTopicInit::TCommandWorkloadTopicInit()
+ : TWorkloadCommand("init", {}, "Create and initialize topic for workload")
+ , PartitionCount(1)
+{
+}
+
+void TCommandWorkloadTopicInit::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption('p', "partitions", "Number of partitions in the topic.")
+ .DefaultValue(128)
+ .StoreResult(&PartitionCount);
+ config.Opts->AddLongOption('c', "consumers", "Number of consumers in the topic.")
+ .DefaultValue(1)
+ .StoreResult(&ConsumerCount);
+}
+
+void TCommandWorkloadTopicInit::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandWorkloadTopicInit::Run(TConfig& config) {
+ Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config));
+ auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*Driver);
+ auto topicName = config.Database + "/" + TOPIC;
+
+ auto describeTopicResult = topicClient->DescribeTopic(topicName, {}).GetValueSync();
+ if (describeTopicResult.GetTopicDescription().GetTotalPartitionsCount() != 0) {
+ Cout << "Topic " << topicName << " already exists.\n";
+ return EXIT_FAILURE;
+ }
+
+ NYdb::NTopic::TCreateTopicSettings settings;
+ for (ui32 consumerIdx = 0; consumerIdx < ConsumerCount; ++consumerIdx) {
+ settings.PartitioningSettings(PartitionCount, PartitionCount)
+ .BeginAddConsumer(TCommandWorkloadTopicParams::GenerateConsumerName(consumerIdx))
+ .EndAddConsumer();
+ }
+
+ auto result = topicClient->CreateTopic(topicName, settings).GetValueSync();
+ ThrowOnError(result);
+
+ return EXIT_SUCCESS;
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h
new file mode 100644
index 0000000000..ef72c07ab5
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ class TCommandWorkloadTopicInit: public TWorkloadCommand {
+ public:
+ TCommandWorkloadTopicInit();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+ private:
+ ui32 PartitionCount;
+ ui32 ConsumerCount;
+ };
+ }
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp
new file mode 100644
index 0000000000..b02a1cd883
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.cpp
@@ -0,0 +1,24 @@
+#include "topic_workload_params.h"
+
+#include "topic_workload_defines.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+using namespace NYdb::NConsoleClient;
+
+ui32 TCommandWorkloadTopicParams::StrToCodec(const TString& str) {
+ THashMap<TString, NYdb::NTopic::ECodec> codecs{
+ {"raw", NYdb::NTopic::ECodec::RAW},
+ {"gzip", NYdb::NTopic::ECodec::GZIP},
+ {"zstd", NYdb::NTopic::ECodec::ZSTD}};
+ TString loweredStr(str);
+ loweredStr.to_lower();
+ codecs.contains(loweredStr) ?: throw yexception() << "Unsupported codec: " << str;
+ return (ui32)codecs[loweredStr];
+}
+
+TString TCommandWorkloadTopicParams::GenerateConsumerName(ui32 consumerIdx)
+{
+ TString consumerName = TStringBuilder() << CONSUMER_PREFIX << '-' << consumerIdx;
+ return consumerName;
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h
new file mode 100644
index 0000000000..ddf7e5a355
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <util/system/types.h>
+#include <util/string/type.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ class TCommandWorkloadTopicParams {
+ public:
+ static ui32 StrToCodec(const TString& str);
+
+ static TString GenerateConsumerName(ui32 consumerIdx);
+ };
+ }
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
new file mode 100644
index 0000000000..b47670691a
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
@@ -0,0 +1,87 @@
+#include "topic_workload_reader.h"
+
+#include "topic_workload_params.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+
+using namespace NYdb::NConsoleClient;
+
+void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams&& params) {
+ auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*params.Driver);
+
+ NYdb::NTopic::TReadSessionSettings settings;
+ settings.ConsumerName(TCommandWorkloadTopicParams::GenerateConsumerName(params.ConsumerIdx))
+ .AppendTopics(TOPIC);
+
+ auto readSession = topicClient->CreateReadSession(settings);
+ WRITE_LOG(params.Log,ELogPriority::TLOG_INFO, "READER Session was created\n");
+
+ struct TPartitionStreamState {
+ ui64 StartOffset;
+ NYdb::NTopic::TPartitionSession::TPtr Stream;
+ };
+ THashMap<std::pair<TString, ui64>, TPartitionStreamState> streamState;
+
+ TInstant LastPartitionStatusRequestTime = TInstant::Zero();
+
+ (*params.StartedCount)++;
+
+ const TInstant endTime = Now() + TDuration::Seconds(params.Seconds + 3);
+
+ while (Now() < endTime && !*params.ErrorFlag) {
+ TInstant st = TInstant::Now();
+ if (TInstant::Now() - LastPartitionStatusRequestTime > TDuration::Seconds(1)) {
+ for (auto& st : streamState) {
+ if (st.second.Stream) {
+ st.second.Stream->RequestStatus();
+ }
+ }
+ LastPartitionStatusRequestTime = st;
+ }
+
+ readSession->WaitEvent().Wait(TDuration::Seconds(1));
+ TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = readSession->GetEvent(false);
+ if (!event)
+ continue;
+
+ if (auto* dataEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
+ WRITE_LOG(params.Log,ELogPriority::TLOG_DEBUG, TStringBuilder() << dataEvent->DebugString() << "\n");
+ for (const auto& message : dataEvent->GetMessages()) {
+ auto messageGroupId = message.GetMessageGroupId();
+ auto stream = message.GetPartitionSession();
+ auto topic = stream->GetTopicPath();
+ auto partition = stream->GetPartitionId();
+ ui64 fullTime = (TInstant::Now() - message.GetCreateTime()).MilliSeconds();
+
+ WRITE_LOG(params.Log,ELogPriority::TLOG_DEBUG, TStringBuilder() << "READER Got message: " << messageGroupId << " topic " << topic << " partition " << partition << " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo() << "\n");
+
+ params.StatsCollector->AddReaderEvent(message.GetData().Size(), fullTime);
+ }
+ dataEvent->Commit();
+ } else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
+ auto stream = createPartitionStreamEvent->GetPartitionSession();
+ ui64 startOffset = streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].StartOffset;
+ streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].Stream = stream;
+ WRITE_LOG(params.Log,ELogPriority::TLOG_DEBUG, TStringBuilder() << "READER Starting read " << createPartitionStreamEvent->DebugString() << " from " << startOffset << "\n");
+ createPartitionStreamEvent->Confirm();
+ } else if (auto* destroyPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
+ auto stream = destroyPartitionStreamEvent->GetPartitionSession();
+ streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].Stream = nullptr;
+ destroyPartitionStreamEvent->Confirm();
+ } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
+ WRITE_LOG(params.Log,ELogPriority::TLOG_ERR, TStringBuilder() << "READER Session closed: '" << closeSessionEvent->DebugString() << "'\n");
+ *params.ErrorFlag = 1;
+ break;
+ } else if (auto* partitionStreamStatusEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&*event)) {
+ WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << partitionStreamStatusEvent->DebugString() << "\n")
+
+ ui64 lagMessages = partitionStreamStatusEvent->GetEndOffset() - partitionStreamStatusEvent->GetCommittedOffset();
+ ui64 lagTime = lagMessages == 0 ? 0 : (Now() - partitionStreamStatusEvent->GetWriteTimeHighWatermark()).MilliSeconds();
+
+ params.StatsCollector->AddLagEvent(lagMessages, lagTime);
+ } else if (auto* ackEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) {
+ WRITE_LOG(params.Log,ELogPriority::TLOG_DEBUG, TStringBuilder() << ackEvent->DebugString() << "\n");
+ }
+ }
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h
new file mode 100644
index 0000000000..f46d9e8f5d
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include "topic_workload_defines.h"
+#include "topic_workload_stats_collector.h"
+
+#include <library/cpp/logger/log.h>
+#include <util/system/types.h>
+#include <util/string/type.h>
+#include <util/generic/size_literals.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ struct TTopicWorkloadReaderParams {
+ size_t Seconds;
+ NYdb::TDriver* Driver;
+ std::shared_ptr<TLog> Log;
+ std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;
+ std::shared_ptr<std::atomic_bool> ErrorFlag;
+ std::shared_ptr<std::atomic_uint> StartedCount;
+
+ ui32 ConsumerIdx;
+ };
+
+ class TTopicWorkloadReader {
+ public:
+ static void ReaderLoop(TTopicWorkloadReaderParams&& params);
+ };
+ }
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
new file mode 100644
index 0000000000..c6d5177ff2
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
@@ -0,0 +1,149 @@
+#include "topic_workload_run_full.h"
+
+#include "topic_workload_defines.h"
+#include "topic_workload_params.h"
+#include "topic_workload_reader.h"
+#include "topic_workload_writer.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_service_topic.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+#include <util/generic/guid.h>
+
+#include <sstream>
+#include <future>
+#include <thread>
+#include <iomanip>
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTopicRunFull::TCommandWorkloadTopicRunFull()
+ : TWorkloadCommand("full", {}, "Full workload")
+ , ErrorFlag(std::make_shared<std::atomic_bool>())
+{
+}
+
+void TCommandWorkloadTopicRunFull::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ // Common params
+ config.Opts->AddLongOption('s', "seconds", "Seconds to run workload.")
+ .DefaultValue(10)
+ .StoreResult(&Seconds);
+ config.Opts->AddLongOption('w', "window", "Output window duration in seconds.")
+ .DefaultValue(1)
+ .StoreResult(&WindowDurationSec);
+ config.Opts->AddLongOption('q', "quiet", "Quiet mode. Doesn't print statistics each second.")
+ .StoreTrue(&Quiet);
+ config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.")
+ .StoreTrue(&PrintTimestamp);
+
+ // Specific params
+ config.Opts->AddLongOption('p', "producer-threads", "Number of producer threads.")
+ .DefaultValue(1)
+ .StoreResult(&ProducerThreadCount);
+ config.Opts->AddLongOption('t', "consumer-threads", "Number of consumer threads.")
+ .DefaultValue(1)
+ .StoreResult(&ConsumerThreadCount);
+ config.Opts->AddLongOption('c', "consumers", "Number of consumers in a topic.")
+ .DefaultValue(1)
+ .StoreResult(&ConsumerCount);
+ config.Opts->AddLongOption('m', "message-size", "Message size.")
+ .DefaultValue(10_KB)
+ .StoreResult(&MessageSize);
+ config.Opts->AddLongOption("message-rate", "Total message rate for all producer threads (messages per second). Exclusive with --byte-rate.")
+ .DefaultValue(0)
+ .StoreResult(&MessageRate);
+ config.Opts->AddLongOption("byte-rate", "Total message rate for all producer threads (bytes per second). Exclusive with --message-rate.")
+ .DefaultValue(0)
+ .StoreResult(&ByteRate);
+ config.Opts->AddLongOption("codec", PrepareAllowedCodecsDescription("Client-side compression algorithm. When read, data will be uncompressed transparently with a codec used on write", InitAllowedCodecs()))
+ .Optional()
+ .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW))
+ .StoreMappedResultT<TString>(&Codec, &TCommandWorkloadTopicParams::StrToCodec);
+
+ config.Opts->MutuallyExclusive("message-rate", "byte-rate");
+}
+
+void TCommandWorkloadTopicRunFull::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
+ Log = std::make_shared<TLog>(CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)));
+ Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))));
+
+ StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(true, true, Quiet, PrintTimestamp, WindowDurationSec, Seconds, ErrorFlag);
+ StatsCollector->PrintHeader();
+
+ std::vector<std::future<void>> threads;
+
+ auto consumerStartedCount = std::make_shared<std::atomic_uint>();
+ for (ui32 readerIdx = 0; readerIdx < ConsumerCount; ++readerIdx) {
+ for (ui32 readerThreadIdx = 0; readerThreadIdx < ConsumerThreadCount; ++readerThreadIdx) {
+ TTopicWorkloadReaderParams readerParams{
+ .Seconds = Seconds,
+ .Driver = Driver.get(),
+ .Log = Log,
+ .StatsCollector = StatsCollector,
+ .ErrorFlag = ErrorFlag,
+ .StartedCount = consumerStartedCount,
+
+ .ConsumerIdx = readerIdx};
+
+ threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::ReaderLoop(std::move(readerParams)); }));
+ }
+ }
+ while (*consumerStartedCount != ConsumerThreadCount * ConsumerCount)
+ Sleep(TDuration::MilliSeconds(10));
+
+ auto producerStartedCount = std::make_shared<std::atomic_uint>();
+ for (ui32 writerIdx = 0; writerIdx < ProducerThreadCount; ++writerIdx) {
+ TTopicWorkloadWriterParams writerParams{
+ .Seconds = Seconds,
+ .Driver = Driver.get(),
+ .Log = Log,
+ .StatsCollector = StatsCollector,
+ .ErrorFlag = ErrorFlag,
+ .StartedCount = producerStartedCount,
+
+ .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate,
+ .ProducerThreadCount = ProducerThreadCount,
+ .MessageGroupId = TStringBuilder() << PRODUCER_PREFIX << "-" << CreateGuidAsString() << "-" << MESSAGE_GROUP_ID_PREFIX << '-' << writerIdx,
+ .MessageSize = MessageSize,
+ .Codec = Codec};
+
+ threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(std::move(writerParams)); }));
+ }
+
+ while (*producerStartedCount != ProducerThreadCount)
+ Sleep(TDuration::MilliSeconds(10));
+
+ StatsCollector->PrintWindowStatsLoop();
+
+ for (auto& future : threads) {
+ future.wait();
+ WRITE_LOG(Log, ELogPriority::TLOG_INFO, "All thread joined.\n");
+ }
+
+ StatsCollector->PrintTotalStats();
+
+ if (*ErrorFlag) {
+ WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "Problems occured while processing messages.\n");
+ return EXIT_FAILURE;
+ }
+
+ if (StatsCollector->GetTotalWriteMessages() == 0) {
+ WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "No messages were written.\n");
+ return EXIT_FAILURE;
+ }
+ if (StatsCollector->GetTotalReadMessages() == 0) {
+ WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "No messages were read.\n");
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h
new file mode 100644
index 0000000000..49c2be1e81
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include "topic_workload_stats_collector.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+
+#include <library/cpp/logger/log.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ class TCommandWorkloadTopicRunFull: public TWorkloadCommand {
+ public:
+ TCommandWorkloadTopicRunFull();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+ private:
+ size_t Seconds;
+ size_t MessageRate;
+ size_t ByteRate;
+ size_t MessageSize;
+ ui32 Codec;
+
+ ui32 ProducerThreadCount;
+ ui32 ConsumerThreadCount;
+ ui32 ConsumerCount;
+
+ std::shared_ptr<TLog> Log;
+
+ std::shared_ptr<std::atomic_bool> ErrorFlag;
+
+ std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;
+ };
+ }
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp
new file mode 100644
index 0000000000..c1f1c46289
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp
@@ -0,0 +1,102 @@
+#include "topic_workload_run_read.h"
+
+#include "topic_workload_defines.h"
+#include "topic_workload_params.h"
+#include "topic_workload_reader.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+#include <util/generic/guid.h>
+
+#include <sstream>
+#include <future>
+#include <thread>
+#include <iomanip>
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTopicRunRead::TCommandWorkloadTopicRunRead()
+ : TWorkloadCommand("read", {}, "Read workload")
+ , ErrorFlag(std::make_shared<std::atomic_bool>())
+{
+}
+
+void TCommandWorkloadTopicRunRead::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ // Common params
+ config.Opts->AddLongOption('s', "seconds", "Seconds to run workload.")
+ .DefaultValue(10)
+ .StoreResult(&Seconds);
+ config.Opts->AddLongOption('w', "window", "Output window duration in seconds.")
+ .DefaultValue(1)
+ .StoreResult(&WindowDurationSec);
+ config.Opts->AddLongOption('q', "quiet", "Quiet mode. Doesn't print statistics each second.")
+ .StoreTrue(&Quiet);
+ config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.")
+ .StoreTrue(&PrintTimestamp);
+
+ // Specific params
+ config.Opts->AddLongOption('c', "consumers", "Number of consumers in a topic.")
+ .DefaultValue(1)
+ .StoreResult(&ConsumerCount);
+ config.Opts->AddLongOption('t', "threads", "Number of consumer threads.")
+ .DefaultValue(1)
+ .StoreResult(&ConsumerThreadCount);
+}
+
+void TCommandWorkloadTopicRunRead::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandWorkloadTopicRunRead::Run(TConfig& config) {
+ Log = std::make_shared<TLog>(CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)));
+ Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))));
+
+ StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(false, true, Quiet, PrintTimestamp, WindowDurationSec, Seconds, ErrorFlag);
+ StatsCollector->PrintHeader();
+
+ std::vector<std::future<void>> threads;
+
+ auto consumerStartedCount = std::make_shared<std::atomic_uint>();
+ for (ui32 readerIdx = 0; readerIdx < ConsumerCount; ++readerIdx) {
+ for (ui32 readerThreadIdx = 0; readerThreadIdx < ConsumerThreadCount; ++readerThreadIdx) {
+ TTopicWorkloadReaderParams readerParams{
+ .Seconds = Seconds,
+ .Driver = Driver.get(),
+ .Log = Log,
+ .StatsCollector = StatsCollector,
+ .ErrorFlag = ErrorFlag,
+ .StartedCount = consumerStartedCount,
+
+ .ConsumerIdx = readerIdx};
+
+ threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::ReaderLoop(std::move(readerParams)); }));
+ }
+ }
+ while (*consumerStartedCount != ConsumerThreadCount * ConsumerCount)
+ Sleep(TDuration::MilliSeconds(10));
+
+ StatsCollector->PrintWindowStatsLoop();
+
+ for (auto& future : threads) {
+ future.wait();
+ WRITE_LOG(Log,ELogPriority::TLOG_INFO, "All thread joined.\n");
+ }
+
+ StatsCollector->PrintTotalStats();
+
+ if (*ErrorFlag) {
+ WRITE_LOG(Log,ELogPriority::TLOG_EMERG, "Problems occured while reading messages.\n");
+ return EXIT_FAILURE;
+ }
+
+ if (StatsCollector->GetTotalReadMessages() == 0) {
+ WRITE_LOG(Log,ELogPriority::TLOG_EMERG, "No messages were read.\n");
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.h
new file mode 100644
index 0000000000..bfe4ed1b1b
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "topic_workload_stats_collector.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+
+#include <library/cpp/logger/log.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ class TCommandWorkloadTopicRunRead: public TWorkloadCommand {
+ public:
+ TCommandWorkloadTopicRunRead();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+ private:
+ size_t Seconds;
+
+ ui32 ConsumerThreadCount;
+ ui32 ConsumerCount;
+
+ std::shared_ptr<TLog> Log;
+
+ std::shared_ptr<std::atomic_bool> ErrorFlag;
+
+ std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;
+ };
+ }
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
new file mode 100644
index 0000000000..22df051dfe
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
@@ -0,0 +1,117 @@
+#include "topic_workload_run_write.h"
+
+#include "topic_workload_defines.h"
+#include "topic_workload_params.h"
+#include "topic_workload_writer.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_service_topic.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+#include <util/generic/guid.h>
+
+#include <sstream>
+#include <future>
+#include <thread>
+#include <iomanip>
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTopicRunWrite::TCommandWorkloadTopicRunWrite()
+ : TWorkloadCommand("write", {}, "Write workload")
+ , ErrorFlag(std::make_shared<std::atomic_bool>())
+{
+}
+
+void TCommandWorkloadTopicRunWrite::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ // Common params
+ config.Opts->AddLongOption('s', "seconds", "Seconds to run workload.")
+ .DefaultValue(10)
+ .StoreResult(&Seconds);
+ config.Opts->AddLongOption('w', "window", "Output window duration in seconds.")
+ .DefaultValue(1)
+ .StoreResult(&WindowDurationSec);
+ config.Opts->AddLongOption('q', "quiet", "Quiet mode. Doesn't print statistics each second.")
+ .StoreTrue(&Quiet);
+ config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.")
+ .StoreTrue(&PrintTimestamp);
+
+ // Specific params
+ config.Opts->AddLongOption('t', "threads", "Number of producer threads.")
+ .DefaultValue(1)
+ .StoreResult(&ProducerThreadCount);
+ config.Opts->AddLongOption('m', "message-size", "Message size.")
+ .DefaultValue(10_KB)
+ .StoreResult(&MessageSize);
+ config.Opts->AddLongOption("message-rate", "Total message rate for all producer threads (messages per second). Exclusive with --byte-rate.")
+ .DefaultValue(0)
+ .StoreResult(&MessageRate);
+ config.Opts->AddLongOption("byte-rate", "Total message rate for all producer threads (bytes per second). Exclusive with --message-rate.")
+ .DefaultValue(0)
+ .StoreResult(&ByteRate);
+ config.Opts->AddLongOption("codec", PrepareAllowedCodecsDescription("Client-side compression algorithm. When read, data will be uncompressed transparently with a codec used on write", InitAllowedCodecs()))
+ .Optional()
+ .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW))
+ .StoreMappedResultT<TString>(&Codec, &TCommandWorkloadTopicParams::StrToCodec);
+
+ config.Opts->MutuallyExclusive("message-rate", "byte-rate");
+}
+
+void TCommandWorkloadTopicRunWrite::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandWorkloadTopicRunWrite::Run(TConfig& config) {
+ Log = std::make_shared<TLog>(CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)));
+ Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))));
+ StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(true, false, Quiet, PrintTimestamp, WindowDurationSec, Seconds, ErrorFlag);
+ StatsCollector->PrintHeader();
+
+ std::vector<std::future<void>> threads;
+
+ auto producerStartedCount = std::make_shared<std::atomic_uint>();
+ for (ui32 writerIdx = 0; writerIdx < ProducerThreadCount; ++writerIdx) {
+ TTopicWorkloadWriterParams writerParams{
+ .Seconds = Seconds,
+ .Driver = Driver.get(),
+ .Log = Log,
+ .StatsCollector = StatsCollector,
+ .ErrorFlag = ErrorFlag,
+ .StartedCount = producerStartedCount,
+
+ .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate,
+ .ProducerThreadCount = ProducerThreadCount,
+ .MessageGroupId = TStringBuilder() << PRODUCER_PREFIX << "-" << CreateGuidAsString() << "-" << MESSAGE_GROUP_ID_PREFIX << '-' << writerIdx,
+ .MessageSize = MessageSize,
+ .Codec = Codec};
+
+ threads.push_back(std::async([writerParams = std::move(writerParams)] () mutable { TTopicWorkloadWriterWorker::WriterLoop(std::move(writerParams)); }));
+ }
+
+ while (*producerStartedCount != ProducerThreadCount)
+ Sleep(TDuration::MilliSeconds(10));
+
+ StatsCollector->PrintWindowStatsLoop();
+
+ for (auto& future : threads) {
+ future.wait();
+ WRITE_LOG(Log, ELogPriority::TLOG_INFO, "All thread joined.\n");
+ }
+
+ StatsCollector->PrintTotalStats();
+
+ if (*ErrorFlag) {
+ WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "Problems occured while writing messages.\n")
+ return EXIT_FAILURE;
+ }
+
+ if (StatsCollector->GetTotalWriteMessages() == 0) {
+ WRITE_LOG(Log, ELogPriority::TLOG_EMERG, "No messages were written.\n");
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.h
new file mode 100644
index 0000000000..b9f19d6692
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "topic_workload_stats_collector.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+
+#include <library/cpp/logger/log.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ class TCommandWorkloadTopicRunWrite: public TWorkloadCommand {
+ public:
+ TCommandWorkloadTopicRunWrite();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+ private:
+ size_t Seconds;
+ size_t MessageRate;
+ size_t ByteRate;
+ size_t MessageSize;
+ ui32 Codec;
+
+ ui32 ProducerThreadCount;
+
+ std::shared_ptr <TLog> Log;
+ std::shared_ptr<std::atomic_bool> ErrorFlag;
+
+ std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;
+ };
+ }
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
new file mode 100644
index 0000000000..394cf2bfbe
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
@@ -0,0 +1,37 @@
+#include "topic_workload_stats.h"
+
+using namespace NYdb::NConsoleClient;
+
+TTopicWorkloadStats::TTopicWorkloadStats()
+ : WriteBytes(0)
+ , WriteMessages(0)
+ , WriteTimeHist(60000, 2)
+ , InflightMessages(0)
+ , LagMessages(0)
+ , LagTimeHist(60000, 2)
+ , ReadBytes(0)
+ , ReadMessages(0)
+ , FullTimeHist(60000, 5)
+{
+}
+
+void TTopicWorkloadStats::AddWriterEvent(ui64 messageSize, ui64 writeTime, ui64 inflightMessages)
+{
+ WriteMessages++;
+ WriteBytes += messageSize;
+ WriteTimeHist.RecordValue(writeTime);
+ InflightMessages = Max(InflightMessages, inflightMessages);
+}
+
+void TTopicWorkloadStats::AddReaderEvent(ui64 messageSize, ui64 fullTime)
+{
+ ReadMessages++;
+ ReadBytes += messageSize;
+ FullTimeHist.RecordValue(fullTime);
+}
+
+void TTopicWorkloadStats::AddLagEvent(ui64 lagMessages, ui64 lagTime)
+{
+ LagMessages = Max(LagMessages, lagMessages);
+ LagTimeHist.RecordValue(lagTime);
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h
new file mode 100644
index 0000000000..f76cdd4b9f
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ class TTopicWorkloadStats {
+ public:
+ TTopicWorkloadStats();
+
+ void AddWriterEvent(ui64 messageSize, ui64 writeTime, ui64 inflightMessages);
+ void AddReaderEvent(ui64 messageSize, ui64 fullTime);
+ void AddLagEvent(ui64 lagMessages, ui64 lagTime);
+
+ ui64 WriteBytes;
+ ui64 WriteMessages;
+ NHdr::THistogram WriteTimeHist;
+ ui64 InflightMessages;
+ ui64 LagMessages;
+ NHdr::THistogram LagTimeHist;
+ ui64 ReadBytes;
+ ui64 ReadMessages;
+ NHdr::THistogram FullTimeHist;
+ };
+ }
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
new file mode 100644
index 0000000000..63d45a4439
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
@@ -0,0 +1,121 @@
+#include "topic_workload_stats_collector.h"
+
+#include "topic_workload_defines.h"
+
+using namespace NYdb::NConsoleClient;
+
+TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector(
+ bool producer, bool consumer,
+ bool quiet, bool printTimestamp,
+ ui32 windowDurationSec, ui32 totalDurationSec,
+ std::shared_ptr<std::atomic_bool> errorFlag)
+ : Producer(producer)
+ , Consumer(consumer)
+ , Quiet(quiet)
+ , PrintTimestamp(printTimestamp)
+ , WindowDurationSec(windowDurationSec)
+ , TotalDurationSec(totalDurationSec)
+ , ErrorFlag(errorFlag)
+ , WindowStats(MakeHolder<TTopicWorkloadStats>())
+{
+}
+
+void TTopicWorkloadStatsCollector::PrintHeader() const {
+ if (Quiet)
+ return;
+
+ Cout << "Window\t" << (Producer ? "Write speed\tWrite time\tInflight\t" : "") << (Consumer ? "Lag\tLag time\tRead speed\tFull time\t" : "") << (PrintTimestamp ? "Timestamp" : "") << Endl;
+ Cout << "#\t" << (Producer ? "msg/s\tMB/s\tP99(ms)\t\tmax msg\t\t" : "") << (Consumer ? "max msg\tP99(ms)\t\tmsg/s\tMB/s\tP99(ms)" : "");
+ Cout << Endl;
+}
+
+void TTopicWorkloadStatsCollector::PrintWindowStatsLoop() {
+ auto StartTime = Now();
+ auto StopTime = StartTime + TDuration::Seconds(TotalDurationSec + 1);
+ int windowIt = 1;
+ auto windowDuration = TDuration::Seconds(WindowDurationSec);
+ while (Now() < StopTime && !*ErrorFlag) {
+ if (Now() > StartTime + windowIt * windowDuration && !*ErrorFlag) {
+ PrintWindowStats(windowIt++);
+ }
+ Sleep(std::max(TDuration::Zero(), Now() - StartTime - windowIt * windowDuration));
+ }
+}
+
+void TTopicWorkloadStatsCollector::PrintWindowStats(ui32 windowIt) {
+ PrintStats(windowIt);
+
+ with_lock (Lock) {
+ WindowStats = MakeHolder<TTopicWorkloadStats>();
+ }
+}
+void TTopicWorkloadStatsCollector::PrintTotalStats() const {
+ PrintHeader();
+ PrintStats({});
+}
+
+void TTopicWorkloadStatsCollector::PrintStats(TMaybe<ui32> windowIt) const {
+ if (Quiet)
+ return;
+
+ with_lock (Lock) {
+ const auto& stats = windowIt.Empty() ? TotalStats : *WindowStats;
+ double seconds = windowIt.Empty() ? TotalDurationSec : WindowDurationSec;
+ TString totalIt = windowIt.Empty() ? "Total" : std::to_string(windowIt.GetRef());
+
+ Cout << totalIt;
+ if (Producer) {
+ Cout << "\t" << (int)(stats.WriteMessages / seconds)
+ << "\t" << (int)(stats.WriteBytes / seconds / 1024 / 1024)
+ << "\t" << stats.WriteTimeHist.GetValueAtPercentile(99) << "\t"
+ << "\t" << stats.InflightMessages << "\t";
+ }
+ if (Consumer) {
+ Cout << "\t" << stats.LagMessages
+ << "\t" << stats.LagTimeHist.GetValueAtPercentile(99) << "\t"
+ << "\t" << (int)(stats.ReadMessages / seconds)
+ << "\t" << (int)(stats.ReadBytes / seconds / 1024 / 1024)
+ << "\t" << stats.FullTimeHist.GetValueAtPercentile(99) << "\t";
+ }
+ if (PrintTimestamp) {
+ Cout << "\t" << Now().ToStringUpToSeconds();
+ }
+ Cout << Endl;
+ }
+}
+
+ui64 TTopicWorkloadStatsCollector::GetTotalReadMessages() const {
+ with_lock (Lock) {
+ return TotalStats.ReadMessages;
+ }
+}
+
+ui64 TTopicWorkloadStatsCollector::GetTotalWriteMessages() const {
+ with_lock (Lock) {
+ return TotalStats.WriteMessages;
+ }
+}
+
+void TTopicWorkloadStatsCollector::AddWriterEvent(ui64 messageSize, ui64 writeTime, ui64 inflightMessages)
+{
+ with_lock (Lock) {
+ WindowStats->AddWriterEvent(messageSize, writeTime, inflightMessages);
+ TotalStats.AddWriterEvent(messageSize, writeTime, inflightMessages);
+ }
+}
+
+void TTopicWorkloadStatsCollector::AddReaderEvent(ui64 messageSize, ui64 fullTime)
+{
+ with_lock (Lock) {
+ WindowStats->AddReaderEvent(messageSize, fullTime);
+ TotalStats.AddReaderEvent(messageSize, fullTime);
+ }
+}
+
+void TTopicWorkloadStatsCollector::AddLagEvent(ui64 lagMessages, ui64 lagTime)
+{
+ with_lock (Lock) {
+ WindowStats->AddLagEvent(lagMessages, lagTime);
+ TotalStats.AddLagEvent(lagMessages, lagTime);
+ }
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h
new file mode 100644
index 0000000000..7fae55204a
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include "topic_workload_stats.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+ class TTopicWorkloadStatsCollector {
+ public:
+ TTopicWorkloadStatsCollector(
+ bool producer, bool consumer,
+ bool quiet, bool printTimestamp,
+ ui32 windowDurationSec, ui32 totalDurationSec,
+ std::shared_ptr<std::atomic_bool> errorFlag);
+
+ void PrintWindowStatsLoop();
+
+ void PrintHeader() const;
+ void PrintTotalStats() const;
+
+ void AddWriterEvent(ui64 messageSize, ui64 writeTime, ui64 inflightMessages);
+ void AddReaderEvent(ui64 messageSize, ui64 fullTime);
+ void AddLagEvent(ui64 lagMessages, ui64 lagTime);
+
+ ui64 GetTotalReadMessages() const;
+ ui64 GetTotalWriteMessages() const;
+
+ private:
+ void PrintWindowStats(ui32 windowIt);
+ void PrintStats(TMaybe<ui32> windowIt) const;
+
+ bool Producer;
+ bool Consumer;
+
+ bool Quiet;
+ bool PrintTimestamp;
+
+ double WindowDurationSec;
+ double TotalDurationSec;
+
+ TSpinLock Lock;
+ std::shared_ptr<std::atomic_bool> ErrorFlag;
+
+ THolder<TTopicWorkloadStats> WindowStats;
+ TTopicWorkloadStats TotalStats;
+ };
+ }
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
new file mode 100644
index 0000000000..32d455a936
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
@@ -0,0 +1,219 @@
+#include "topic_workload_writer.h"
+
+#include <util/generic/overloaded.h>
+
+using namespace NYdb::NConsoleClient;
+
+TTopicWorkloadWriterWorker::TTopicWorkloadWriterWorker(
+ TTopicWorkloadWriterParams&& params)
+ : Params(params)
+ , MessageId(1)
+ , StartTimestamp(TInstant::Now())
+ , StatsCollector(params.StatsCollector)
+
+{
+ Closed = std::make_shared<std::atomic<bool>>(false);
+ GenerateMessages();
+ CreateWorker();
+}
+
+TTopicWorkloadWriterWorker::~TTopicWorkloadWriterWorker()
+{
+ if (WriteSession)
+ WriteSession->Close();
+}
+
+void TTopicWorkloadWriterWorker::CreateWorker() {
+ if (!InitSeqNoProcessed) {
+ *Params.ErrorFlag = 1;
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_WARNING, "WRITER no progress while writing by protocol.\n");
+ return;
+ }
+
+ MessageGroupId = Params.MessageGroupId;
+
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "WRITER create worker for " << MessageGroupId << ".\n");
+
+ if (WriteSession)
+ WriteSession->Close();
+
+ CreateTopicWorker();
+}
+
+void TTopicWorkloadWriterWorker::Close() {
+ Closed->store(true);
+ if (WriteSession)
+ WriteSession->Close(TDuration::Zero());
+}
+
+const size_t GENERATED_MESSAGES_COUNT = 32;
+
+void TTopicWorkloadWriterWorker::GenerateMessages() {
+ TStringBuilder res;
+ for (size_t i = 0; i < GENERATED_MESSAGES_COUNT; i++) {
+ res.clear();
+ while (res.Size() < Params.MessageSize)
+ res << RandomNumber<ui64>(UINT64_MAX);
+ GeneratedMessages.push_back(res);
+ }
+}
+
+TString TTopicWorkloadWriterWorker::GetGeneratedMessage() const {
+ return GeneratedMessages[MessageId % GENERATED_MESSAGES_COUNT];
+}
+
+TDuration TTopicWorkloadWriterWorker::Process() {
+ if (!InitSeqNoProcessed) {
+ if (!InitSeqNo.HasValue() && !InitSeqNo.Wait(TDuration::Seconds(1))) {
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_WARNING, "WRITER no initial sequence number\n");
+ return TDuration::Seconds(1);
+ }
+ if (InitSeqNo.HasException()) {
+ try {
+ InitSeqNo.GetValue();
+ } catch (yexception e) {
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_ERR, TStringBuilder() << "Future exception: " << e.what() << "\n");
+ }
+ *Params.ErrorFlag = 1;
+ return TDuration::Zero();
+ }
+
+ InitSeqNoProcessed = true;
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "WRITER sequence number initialized " << InitSeqNo.GetValue() << "\n");
+ if (MessageId != InitSeqNo.GetValue() + 1) {
+ MessageId = InitSeqNo.GetValue() + 1;
+ AckedMessageId = MessageId - 1;
+ }
+ }
+
+ ui64 elapsedSeconds = (Now() - StartTimestamp).Seconds();
+ const ui64 bytesMustBeWritten = Params.ByteRate == 0 ? UINT64_MAX : elapsedSeconds * Params.ByteRate / Params.ProducerThreadCount;
+
+ while (true) {
+ auto events = WriteSession->GetEvents(false);
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "WRITER Got " << events.size() << " events \n");
+
+ for (auto& e : events)
+ ProcessEvent(e);
+
+ if (BytesWritten < bytesMustBeWritten && ContinuationToken.Defined()) {
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "WRITER Writing message " << MessageId << "\n");
+
+ TString data = GetGeneratedMessage();
+ size_t messageSize = data.size();
+
+ TMaybe<TInstant> createTimestamp =
+ Params.ByteRate == 0 ? TMaybe<TInstant>(Nothing())
+ : StartTimestamp + TDuration::Seconds((double)BytesWritten / Params.ByteRate * Params.ProducerThreadCount);
+
+ InflightMessages[MessageId] = {messageSize, createTimestamp.GetOrElse(Now())};
+
+ BytesWritten += messageSize;
+
+ WriteSession->Write(std::move(ContinuationToken.GetRef()), data, MessageId++, createTimestamp);
+ ContinuationToken.Clear();
+ }
+
+ if (events.empty())
+ break;
+ }
+
+ const TDuration timeToNextMessage = TDuration::MilliSeconds(50);
+ return timeToNextMessage;
+}
+
+bool TTopicWorkloadWriterWorker::ProcessEvent(
+ NYdb::NTopic::TWriteSessionEvent::TEvent& event) {
+ return std::visit(
+ TOverloaded{
+ [this](const NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) {
+ return ProcessAckEvent(event);
+ },
+ [this](NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ return ProcessReadyToAcceptEvent(event);
+ },
+ [this](const NYdb::NTopic::TSessionClosedEvent& event) {
+ return ProcessSessionClosedEvent(event);
+ }}, event);
+};
+
+bool TTopicWorkloadWriterWorker::ProcessAckEvent(
+ const NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) {
+ bool hasProgress = false;
+ //! Acks just confirm that message was received and saved by server
+ //! successfully. Here we just count acked messages to check, that everything
+ //! written is confirmed.
+ for (const auto& ack : event.Acks) {
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "WRITER Got ack for write " << AckedMessageId << "\n");
+ AckedMessageId = ack.SeqNo;
+
+ auto inflightMessageIter = InflightMessages.find(AckedMessageId);
+ if (inflightMessageIter == InflightMessages.end())
+ {
+ *Params.ErrorFlag = 1;
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_ERR, TStringBuilder() << "WRITER Unknown AckedMessageId " << AckedMessageId << "\n");
+ return false;
+ }
+
+ ui64 writeTime = (Now() - inflightMessageIter->second.MessageTime).MilliSeconds();
+ ui64 messageSize = inflightMessageIter->second.MessageSize;
+ InflightMessages.erase(inflightMessageIter);
+
+ StatsCollector->AddWriterEvent(messageSize, writeTime, InflightMessages.size());
+
+ hasProgress = true;
+ }
+ return hasProgress;
+}
+
+bool TTopicWorkloadWriterWorker::ProcessReadyToAcceptEvent(
+ NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ //! TReadyToAcceptEvent provide continue tokens - an object to perform further
+ //! writes.
+ //! Do NOT lose continue tokens!
+
+ ContinuationToken = std::move(event.ContinuationToken);
+
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, "WRITER Got new continue token\n");
+
+ return true;
+}
+
+bool TTopicWorkloadWriterWorker::ProcessSessionClosedEvent(
+ const NYdb::NTopic::TSessionClosedEvent& event) {
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "WRITER Got close event: " << event.DebugString() << "\n");
+ //! Session is closed, stop any work with it.
+ Y_FAIL("session closed");
+ return false;
+}
+
+void TTopicWorkloadWriterWorker::CreateTopicWorker() {
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_INFO, "WRITER Creating worker\n");
+ Y_VERIFY(Params.Driver);
+ NYdb::NTopic::TWriteSessionSettings settings;
+ settings.Codec((NYdb::NTopic::ECodec)Params.Codec);
+ settings.Path(TOPIC);
+ settings.ProducerId(MessageGroupId).MessageGroupId(MessageGroupId);
+ WriteSession = NYdb::NTopic::TTopicClient(*Params.Driver).CreateWriteSession(settings);
+ InitSeqNo = WriteSession->GetInitSeqNo();
+ InitSeqNoProcessed = false;
+}
+
+void TTopicWorkloadWriterWorker::WriterLoop(TTopicWorkloadWriterParams&& params) {
+ TTopicWorkloadWriterWorker writer(std::move(params));
+
+ (*params.StartedCount)++;
+
+ WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "WRITER started " << Now().ToStringUpToSeconds() << "\n");
+
+ auto endTime = TInstant::Now() + TDuration::Seconds(params.Seconds);
+ while (Now() < endTime && !*params.ErrorFlag) {
+ TDuration delta = TDuration::MilliSeconds(100);
+ auto d = writer.Process();
+ delta = Min(d, delta);
+ Sleep(delta);
+ }
+
+
+ WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "WRITER finished " << Now().ToStringUpToSeconds() << "\n");
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
new file mode 100644
index 0000000000..70a7b461c8
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
@@ -0,0 +1,81 @@
+#pragma once
+
+#include "topic_workload_defines.h"
+#include "topic_workload_stats_collector.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+#include <library/cpp/logger/log.h>
+#include <util/generic/string.h>
+
+namespace NYdb {
+ namespace NConsoleClient {
+
+ struct TTopicWorkloadWriterParams {
+ size_t Seconds;
+ NYdb::TDriver* Driver;
+ std::shared_ptr<TLog> Log;
+ std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;
+ std::shared_ptr<std::atomic<bool>> ErrorFlag;
+ std::shared_ptr<std::atomic_uint> StartedCount;
+
+ size_t ByteRate;
+ ui32 ProducerThreadCount;
+ TString MessageGroupId;
+ size_t MessageSize;
+ ui32 Codec = 0;
+ };
+
+ class TTopicWorkloadWriterWorker {
+ public:
+ TTopicWorkloadWriterWorker(TTopicWorkloadWriterParams&& params);
+ ~TTopicWorkloadWriterWorker();
+
+ void Close();
+
+ TDuration Process();
+
+ void CreateWorker();
+
+ void CreateTopicWorker();
+
+ static void WriterLoop(TTopicWorkloadWriterParams&& params);
+
+ private:
+ bool ProcessAckEvent(const NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event);
+
+ bool ProcessReadyToAcceptEvent(NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event);
+ bool ProcessSessionClosedEvent(const NYdb::NTopic::TSessionClosedEvent& event);
+
+ bool ProcessEvent(NYdb::NTopic::TWriteSessionEvent::TEvent& event);
+
+ TString GetGeneratedMessage() const;
+ void GenerateMessages();
+
+
+
+ TTopicWorkloadWriterParams Params;
+ ui64 MessageId = 0;
+ ui64 AckedMessageId = 0;
+ ui64 BytesWritten = 0;
+ TString MessageGroupId;
+ std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession;
+ TInstant StartTimestamp;
+
+ std::vector<TString> GeneratedMessages;
+
+ NThreading::TFuture<ui64> InitSeqNo;
+ bool InitSeqNoProcessed = true;
+ TMaybe<NTopic::TContinuationToken> ContinuationToken;
+
+ std::shared_ptr<std::atomic<bool>> Closed;
+ std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;
+
+ struct TInflightMessage {
+ size_t MessageSize;
+ TInstant MessageTime;
+ };
+ THashMap<ui64, TInflightMessage> InflightMessages;
+ };
+ }
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
index ea0b2d8584..ba67085b69 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
@@ -77,24 +77,9 @@ namespace NYdb::NConsoleClient {
bool IsStreamingFormat(EMessagingFormat format) {
return format == EMessagingFormat::NewlineDelimited || format == EMessagingFormat::Concatenated;
}
-
- ELogPriority VerbosityLevelToELogPriority(TClientCommand::TConfig::EVerbosityLevel lvl) {
- switch (lvl) {
- case TClientCommand::TConfig::EVerbosityLevel::NONE:
- return ELogPriority::TLOG_EMERG;
- case TClientCommand::TConfig::EVerbosityLevel::DEBUG:
- return ELogPriority::TLOG_DEBUG;
- case TClientCommand::TConfig::EVerbosityLevel::INFO:
- return ELogPriority::TLOG_INFO;
- case TClientCommand::TConfig::EVerbosityLevel::WARN:
- return ELogPriority::TLOG_WARNING;
- default:
- return ELogPriority::TLOG_EMERG;
- }
- }
} // namespace
- namespace {
+
TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs) {
TStringStream description;
description << descriptionPrefix << ". Available codecs: ";
@@ -109,20 +94,21 @@ namespace NYdb::NConsoleClient {
return description.Str();
}
+
+namespace {
+ NTopic::ECodec ParseCodec(const TString& codecStr, const TVector<NTopic::ECodec>& allowedCodecs) {
+ auto exists = ExistingCodecs.find(to_lower(codecStr));
+ if (exists == ExistingCodecs.end()) {
+ throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
+ }
- NTopic::ECodec ParseCodec(const TString& codecStr, const TVector<NTopic::ECodec>& allowedCodecs) {
- auto exists = ExistingCodecs.find(to_lower(codecStr));
- if (exists == ExistingCodecs.end()) {
- throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
- }
+ if (std::find(allowedCodecs.begin(), allowedCodecs.end(), exists->second) == allowedCodecs.end()) {
+ throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
+ }
- if (std::find(allowedCodecs.begin(), allowedCodecs.end(), exists->second) == allowedCodecs.end()) {
- throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
+ return exists->second;
}
-
- return exists->second;
}
- }
void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) {
TString description = PrepareAllowedCodecsDescription("Comma-separated list of supported codecs", supportedCodecs);
@@ -655,7 +641,7 @@ namespace NYdb::NConsoleClient {
ValidateConfig();
auto driver =
- std::make_unique<TDriver>(CreateDriver(config, CreateLogBackend("cerr", VerbosityLevelToELogPriority(config.VerbosityLevel))));
+ std::make_unique<TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))));
NTopic::TTopicClient topicClient(*driver);
auto readSession = topicClient.CreateReadSession(PrepareReadSessionSettings());
@@ -783,7 +769,7 @@ namespace NYdb::NConsoleClient {
SetInterruptHandlers();
auto driver =
- std::make_unique<TDriver>(CreateDriver(config, CreateLogBackend("cerr", VerbosityLevelToELogPriority(config.VerbosityLevel))));
+ std::make_unique<TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))));
NTopic::TTopicClient topicClient(*driver);
{
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
index 5daa65a4a5..ca8af79c62 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
@@ -10,6 +10,7 @@
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
namespace NYdb::NConsoleClient {
+ TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs);
TVector<NYdb::NTopic::ECodec> InitAllowedCodecs();
const TVector<NYdb::NTopic::ECodec> AllowedCodecs = InitAllowedCodecs();
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
index c7b2744d65..8e710f497d 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
@@ -3,6 +3,7 @@
#include "stock_workload.h"
#include "kv_workload.h"
#include "click_bench.h"
+#include "topic_workload/topic_workload.h"
#include <ydb/library/workload/workload_factory.h>
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
@@ -38,6 +39,7 @@ TCommandWorkload::TCommandWorkload()
AddCommand(std::make_unique<TCommandStock>());
AddCommand(std::make_unique<TCommandKv>());
AddCommand(std::make_unique<TCommandClickBench>());
+ AddCommand(std::make_unique<TCommandWorkloadTopic>());
}
TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_list<TString>& aliases, const TString& description)
diff --git a/ydb/public/lib/ydb_cli/common/command.cpp b/ydb/public/lib/ydb_cli/common/command.cpp
index 13d624b651..250089e4f9 100644
--- a/ydb/public/lib/ydb_cli/common/command.cpp
+++ b/ydb/public/lib/ydb_cli/common/command.cpp
@@ -98,6 +98,21 @@ TClientCommand::TClientCommand(const TString& name, const std::initializer_list<
Opts.SetWrap(Max(Opts.Wrap_, static_cast<ui32>(TermWidth())));
}
+ELogPriority TClientCommand::TConfig::VerbosityLevelToELogPriority(TClientCommand::TConfig::EVerbosityLevel lvl) {
+ switch (lvl) {
+ case TClientCommand::TConfig::EVerbosityLevel::NONE:
+ return ELogPriority::TLOG_EMERG;
+ case TClientCommand::TConfig::EVerbosityLevel::DEBUG:
+ return ELogPriority::TLOG_DEBUG;
+ case TClientCommand::TConfig::EVerbosityLevel::INFO:
+ return ELogPriority::TLOG_INFO;
+ case TClientCommand::TConfig::EVerbosityLevel::WARN:
+ return ELogPriority::TLOG_WARNING;
+ default:
+ return ELogPriority::TLOG_ERR;
+ }
+}
+
TClientCommand::TOptsParseOneLevelResult::TOptsParseOneLevelResult(TConfig& config) {
int _argc = 1;
int levels = 1;
diff --git a/ydb/public/lib/ydb_cli/common/command.h b/ydb/public/lib/ydb_cli/common/command.h
index c3c02ba86c..4ce1aad157 100644
--- a/ydb/public/lib/ydb_cli/common/command.h
+++ b/ydb/public/lib/ydb_cli/common/command.h
@@ -6,6 +6,7 @@
#include <library/cpp/getopt/last_getopt.h>
#include <library/cpp/colorizer/colors.h>
+#include <library/cpp/logger/priority.h>
#include <util/generic/strbuf.h>
#include <util/generic/vector.h>
#include <util/charset/utf8.h>
@@ -70,6 +71,8 @@ public:
DEBUG = 3,
};
+ static ELogPriority VerbosityLevelToELogPriority(EVerbosityLevel lvl);
+
int ArgC;
char** ArgV;
int InitialArgC;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 4d9d519224..b2d24a177c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -258,7 +258,7 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain
std::function<void(bool)> connectTimeoutCallback;
if (!status.Ok()) {
- LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Got error. Status: " << status.Status
+ LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "Got error. Status: " << status.Status
<< ". Description: " << IssuesSingleLineString(status.Issues));
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
index 083ec713c6..76d782a954 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
@@ -98,7 +98,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
return result;
}
- LOG_LAZY(DbDriverState->Log, TLOG_INFO,
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR,
LogPrefix() << "Got error. Status: " << status.Status
<< ". Description: " << IssuesSingleLineString(status.Issues)
);
@@ -119,7 +119,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
ResetForRetryImpl();
} else {
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session will not restart after a fatal error");
result.DoStop = true;
CheckHandleResultImpl(result);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 992eabbc38..3ffa76eab6 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -98,7 +98,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
return result;
}
LOG_LAZY(DbDriverState->Log,
- TLOG_INFO,
+ TLOG_ERR,
LogPrefix() << "Got error. Status: " << status.Status
<< ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues)
);
@@ -119,7 +119,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
ResetForRetryImpl();
} else {
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error");
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session will not restart after a fatal error");
result.DoStop = true;
CheckHandleResultImpl(result);
}