aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-07-06 14:17:38 +0300
committerabcdef <akotov@ydb.tech>2023-07-06 14:17:38 +0300
commitf3493e9e24c116f8dbd8508ba0742dad79ba1e6d (patch)
tree01c45623fb9ccca6e106fa33694f0aeddc00a9c3
parent047078a0b267f9511823cfd780ea14573dc0af56 (diff)
downloadydb-f3493e9e24c116f8dbd8508ba0742dad79ba1e6d.tar.gz
add a branch of `workload transfer topic-to-table` commands
добавил команды `workload transfer topic-to-table init|clean`
-rw-r--r--ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/apps/ydb/ut/workload-topic.cpp (renamed from ydb/apps/ydb/ut/main.cpp)0
-rw-r--r--ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp160
-rw-r--r--ydb/apps/ydb/ut/ya.make6
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp23
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h17
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp4
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt35
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt36
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt36
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.txt17
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt35
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.cpp10
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.h12
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp8
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h10
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp17
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.h12
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp43
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.h24
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp69
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h26
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make28
-rw-r--r--ydb/public/lib/ydb_cli/commands/ya.make1
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.cpp2
30 files changed, 645 insertions, 6 deletions
diff --git a/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt
index 28cd8ac960..e8a767d104 100644
--- a/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt
@@ -14,6 +14,7 @@ target_link_libraries(ydb-apps-ydb-ut PUBLIC
yutil
cpp-testing-unittest_main
cpp-client-ydb_topic
+ cpp-client-ydb_table
)
target_link_options(ydb-apps-ydb-ut PRIVATE
-ldl
@@ -26,7 +27,8 @@ target_link_options(ydb-apps-ydb-ut PRIVATE
-ldl
)
target_sources(ydb-apps-ydb-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/apps/ydb/ut/main.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/ydb/ut/workload-topic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp
)
set_property(
TARGET
diff --git a/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt
index fa8161df2d..e194688952 100644
--- a/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(ydb-apps-ydb-ut PUBLIC
library-cpp-cpuid_check
cpp-testing-unittest_main
cpp-client-ydb_topic
+ cpp-client-ydb_table
)
target_link_options(ydb-apps-ydb-ut PRIVATE
-ldl
@@ -27,7 +28,8 @@ target_link_options(ydb-apps-ydb-ut PRIVATE
-ldl
)
target_sources(ydb-apps-ydb-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/apps/ydb/ut/main.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/ydb/ut/workload-topic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp
)
set_property(
TARGET
diff --git a/ydb/apps/ydb/ut/main.cpp b/ydb/apps/ydb/ut/workload-topic.cpp
index 056561dd13..056561dd13 100644
--- a/ydb/apps/ydb/ut/main.cpp
+++ b/ydb/apps/ydb/ut/workload-topic.cpp
diff --git a/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp b/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp
new file mode 100644
index 0000000000..c966eebd7d
--- /dev/null
+++ b/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp
@@ -0,0 +1,160 @@
+#include <library/cpp/testing/common/env.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <util/string/cast.h>
+#include <util/string/split.h>
+#include <util/system/env.h>
+#include <util/system/shellcommand.h>
+
+Y_UNIT_TEST_SUITE(YdbWorkloadTransferTopicToTable) {
+
+struct TTopicConfigMatcher {
+ TString Name = "transfer-topic";
+ ui32 Partitions = 128;
+ ui32 Consumers = 1;
+};
+
+struct TTableConfigMatcher {
+ TString Name = "transfer-table";
+ ui32 Partitions = 128;
+};
+
+TString GetYdbEndpoint()
+{
+ return GetEnv("YDB_ENDPOINT");
+}
+
+TString GetYdbDatabase()
+{
+ return GetEnv("YDB_DATABASE");
+}
+
+NYdb::NTable::TSession GetSession(NYdb::NTable::TTableClient& client)
+{
+ auto result = client.GetSession().GetValueSync();
+ return result.GetSession();
+}
+
+void ExpectTopic(const TTopicConfigMatcher& matcher)
+{
+ NYdb::TDriverConfig config;
+ config.SetEndpoint(GetYdbEndpoint());
+ config.SetDatabase(GetYdbDatabase());
+
+ NYdb::TDriver driver(config);
+ NYdb::NTopic::TTopicClient client(driver);
+
+ auto result = client.DescribeTopic(matcher.Name).GetValueSync();
+ if (result.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
+ UNIT_ASSERT_VALUES_EQUAL(0, matcher.Partitions);
+ UNIT_ASSERT_VALUES_EQUAL(0, matcher.Consumers);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
+
+ auto& description = result.GetTopicDescription();
+
+ UNIT_ASSERT_VALUES_EQUAL(description.GetPartitions().size(), matcher.Partitions);
+ UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), matcher.Consumers);
+ }
+}
+
+void ExpectTable(const TTableConfigMatcher& matcher)
+{
+ NYdb::TDriverConfig config;
+ config.SetEndpoint(GetYdbEndpoint());
+ config.SetDatabase(GetYdbDatabase());
+
+ NYdb::TDriver driver(config);
+ NYdb::NTable::TTableClient client(driver);
+ auto session = GetSession(client);
+
+ NYdb::NTable::TDescribeTableSettings options;
+ options.WithTableStatistics(true);
+
+ auto result = session.DescribeTable("/" + GetYdbDatabase() + "/" + matcher.Name,
+ options).GetValueSync();
+ if (result.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
+ UNIT_ASSERT_VALUES_EQUAL(0, matcher.Partitions);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
+
+ auto description = result.GetTableDescription();
+
+ UNIT_ASSERT_VALUES_EQUAL(description.GetPartitionsCount(), matcher.Partitions);
+ }
+}
+
+TString ExecYdb(TList<TString> args)
+{
+ //
+ // ydb -e grpc://${YDB_ENDPOINT} -d /${YDB_DATABASE} workload topic ${args}
+ //
+ TShellCommand command(BinaryPath("ydb/apps/ydb/ydb"));
+
+ command << "-e" << ("grpc://" + GetYdbEndpoint());
+ command << "-d" << ("/" + GetYdbDatabase());
+ command << "workload" << "transfer" << "topic-to-table";
+
+ for (auto& arg : args) {
+ command << arg;
+ }
+
+ command.Run().Wait();
+
+ if (command.GetExitCode() != 0) {
+ ythrow yexception() << "command `" << command.GetQuotedCommand() << "` exit with code " << command.GetExitCode();
+ }
+
+ return command.GetOutput();
+}
+
+void RunYdb(const TList<TString>& args,
+ const TString& topic, ui32 topicPartitions, ui32 consumers,
+ const TString& table, ui32 tablePartitions)
+{
+ ExecYdb(args);
+ ExpectTopic({.Name=topic, .Partitions=topicPartitions, .Consumers=consumers});
+ ExpectTable({.Name=table, .Partitions=tablePartitions});
+}
+
+Y_UNIT_TEST(Default_Init_Clean)
+{
+ const TString topic = "transfer-topic";
+ const TString table = "transfer-table";
+
+ RunYdb({"init"}, topic, 128, 1, table, 128);
+ RunYdb({"clean"}, topic, 0, 0, table, 0);
+}
+
+Y_UNIT_TEST(Specific_Init_Clean)
+{
+ const TString topic = "my-topic";
+ const TString table = "my-table";
+
+ RunYdb({"init",
+ "--topic", topic, "--topic-partitions", "3", "--consumers", "5",
+ "--table", table, "--table-partitions", "8"},
+ topic, 3, 5,
+ table, 8);
+ RunYdb({"clean",
+ "--topic", topic,
+ "--table", table},
+ topic, 0, 0,
+ table, 0);
+}
+
+Y_UNIT_TEST(Clean_Without_Init)
+{
+ UNIT_ASSERT_EXCEPTION(ExecYdb({"clean"}), yexception);
+}
+
+Y_UNIT_TEST(Double_Init)
+{
+ ExecYdb({"init"});
+ UNIT_ASSERT_EXCEPTION(ExecYdb({"init"}), yexception);
+}
+
+}
diff --git a/ydb/apps/ydb/ut/ya.make b/ydb/apps/ydb/ut/ya.make
index 34238a1374..6063398822 100644
--- a/ydb/apps/ydb/ut/ya.make
+++ b/ydb/apps/ydb/ut/ya.make
@@ -5,13 +5,17 @@ DEPENDS(
)
SRCS(
- main.cpp
+ workload-topic.cpp
+ workload-transfer-topic-to-table.cpp
)
PEERDIR(
ydb/public/sdk/cpp/client/ydb_topic
+ ydb/public/sdk/cpp/client/ydb_table
)
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
+REQUIREMENTS(ram:16)
+
END()
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
index 800586248a..ffaf180e1a 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
@@ -7,6 +7,7 @@
add_subdirectory(topic_workload)
+add_subdirectory(transfer_workload)
get_built_tool_path(
TOOL_rescompiler_bin
TOOL_rescompiler_dependency
@@ -27,6 +28,7 @@ target_link_libraries(clicommands PUBLIC
public-lib-operation_id
common
topic_workload
+ transfer_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
@@ -89,6 +91,7 @@ target_link_libraries(clicommands.global PUBLIC
public-lib-operation_id
common
topic_workload
+ transfer_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
index 6300cfff7f..50cb264288 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
@@ -7,6 +7,7 @@
add_subdirectory(topic_workload)
+add_subdirectory(transfer_workload)
get_built_tool_path(
TOOL_rescompiler_bin
TOOL_rescompiler_dependency
@@ -28,6 +29,7 @@ target_link_libraries(clicommands PUBLIC
public-lib-operation_id
common
topic_workload
+ transfer_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
@@ -91,6 +93,7 @@ target_link_libraries(clicommands.global PUBLIC
public-lib-operation_id
common
topic_workload
+ transfer_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
index 6300cfff7f..50cb264288 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
@@ -7,6 +7,7 @@
add_subdirectory(topic_workload)
+add_subdirectory(transfer_workload)
get_built_tool_path(
TOOL_rescompiler_bin
TOOL_rescompiler_dependency
@@ -28,6 +29,7 @@ target_link_libraries(clicommands PUBLIC
public-lib-operation_id
common
topic_workload
+ transfer_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
@@ -91,6 +93,7 @@ target_link_libraries(clicommands.global PUBLIC
public-lib-operation_id
common
topic_workload
+ transfer_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
index 800586248a..ffaf180e1a 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
@@ -7,6 +7,7 @@
add_subdirectory(topic_workload)
+add_subdirectory(transfer_workload)
get_built_tool_path(
TOOL_rescompiler_bin
TOOL_rescompiler_dependency
@@ -27,6 +28,7 @@ target_link_libraries(clicommands PUBLIC
public-lib-operation_id
common
topic_workload
+ transfer_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
@@ -89,6 +91,7 @@ target_link_libraries(clicommands.global PUBLIC
public-lib-operation_id
common
topic_workload
+ transfer_workload
lib-ydb_cli-dump
lib-ydb_cli-import
topic
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
index 2fe4e94b79..fde85a89ff 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
@@ -102,6 +102,22 @@ void TTopicOperationsScenario::DropTopic(const TString& database,
ThrowOnError(result);
}
+void TTopicOperationsScenario::DropTable(const TString& database, const TString& table)
+{
+ NTable::TTableClient client(*Driver);
+ auto session = GetSession(client);
+ auto result = session.DropTable(database + "/" + table).GetValueSync();
+ ThrowOnError(result);
+}
+
+void TTopicOperationsScenario::ExecSchemeQuery(const TString& query)
+{
+ NTable::TTableClient client(*Driver);
+ auto session = GetSession(client);
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ ThrowOnError(result);
+}
+
void TTopicOperationsScenario::EnsureTopicNotExist(const TString& topic)
{
Y_VERIFY(Driver);
@@ -136,6 +152,13 @@ void TTopicOperationsScenario::CreateTopic(const TString& topic,
ThrowOnError(result);
}
+NTable::TSession TTopicOperationsScenario::GetSession(NTable::TTableClient& client)
+{
+ auto result = client.GetSession({}).GetValueSync();
+ ThrowOnError(result);
+ return result.GetSession();
+}
+
void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void>>& threads,
const TString& database)
{
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
index 2f3c861c28..8db8551787 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
@@ -21,6 +21,13 @@ class TDriver;
}
+namespace NYdb::NTable {
+
+class TSession;
+class TTableClient;
+
+}
+
namespace NYdb::NConsoleClient {
class TTopicWorkloadStatsCollector;
@@ -41,7 +48,7 @@ public:
bool PrintTimestamp;
double Percentile;
TString TopicName;
- ui32 PartitionCount = 1;
+ ui32 TopicPartitionCount = 1;
ui32 ProducerThreadCount;
ui32 ConsumerThreadCount;
ui32 ConsumerCount;
@@ -49,6 +56,8 @@ public:
size_t MessageRate;
size_t ByteRate;
ui32 Codec;
+ TString TableName;
+ ui32 TablePartitionCount = 1;
protected:
void CreateTopic(const TString& database,
@@ -58,6 +67,10 @@ protected:
void DropTopic(const TString& database,
const TString& topic);
+ void DropTable(const TString& database, const TString& table);
+
+ void ExecSchemeQuery(const TString& query);
+
void StartConsumerThreads(std::vector<std::future<void>>& threads,
const TString& database);
void StartProducerThreads(std::vector<std::future<void>>& threads,
@@ -83,6 +96,8 @@ private:
ui32 partitionCount,
ui32 consumerCount);
+ static NTable::TSession GetSession(NTable::TTableClient& client);
+
static THolder<TLogBackend> MakeLogBackend(TClientCommand::TConfig::EVerbosityLevel level);
void InitLog(const TClientCommand::TConfig& config);
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
index bab243b2df..0183654ba4 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
@@ -10,7 +10,7 @@ using namespace NYdb::NConsoleClient;
int TCommandWorkloadTopicInit::TScenario::DoRun(const TConfig& config)
{
- CreateTopic(config.Database, TopicName, PartitionCount, ConsumerCount);
+ CreateTopic(config.Database, TopicName, TopicPartitionCount, ConsumerCount);
return EXIT_SUCCESS;
}
@@ -32,7 +32,7 @@ void TCommandWorkloadTopicInit::Config(TConfig& config)
config.Opts->AddLongOption('p', "partitions", "Number of partitions in the topic.")
.DefaultValue(128)
- .StoreResult(&Scenario.PartitionCount);
+ .StoreResult(&Scenario.TopicPartitionCount);
config.Opts->AddLongOption('c', "consumers", "Number of consumers in the topic.")
.DefaultValue(1)
.StoreResult(&Scenario.ConsumerCount);
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..dc7f515b84
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,35 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(transfer_workload)
+target_link_libraries(transfer_workload PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ yql-public-issue
+ public-issue-protos
+ api-grpc
+ api-protos
+ api-protos-annotations
+ public-lib-operation_id
+ lib-operation_id-protos
+ cpp-client-draft
+ cpp-client-ydb_driver
+ cpp-client-ydb_proto
+ cpp-client-ydb_table
+ cpp-client-ydb_topic
+ client-ydb_types-operation
+ client-ydb_types-status
+)
+target_sources(transfer_workload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
+)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..3bbdc78d3d
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,36 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(transfer_workload)
+target_link_libraries(transfer_workload PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ yql-public-issue
+ public-issue-protos
+ api-grpc
+ api-protos
+ api-protos-annotations
+ public-lib-operation_id
+ lib-operation_id-protos
+ cpp-client-draft
+ cpp-client-ydb_driver
+ cpp-client-ydb_proto
+ cpp-client-ydb_table
+ cpp-client-ydb_topic
+ client-ydb_types-operation
+ client-ydb_types-status
+)
+target_sources(transfer_workload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
+)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..3bbdc78d3d
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,36 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(transfer_workload)
+target_link_libraries(transfer_workload PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ yql-public-issue
+ public-issue-protos
+ api-grpc
+ api-protos
+ api-protos-annotations
+ public-lib-operation_id
+ lib-operation_id-protos
+ cpp-client-draft
+ cpp-client-ydb_driver
+ cpp-client-ydb_proto
+ cpp-client-ydb_table
+ cpp-client-ydb_topic
+ client-ydb_types-operation
+ client-ydb_types-status
+)
+target_sources(transfer_workload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
+)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..dc7f515b84
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,35 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(transfer_workload)
+target_link_libraries(transfer_workload PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ yql-public-issue
+ public-issue-protos
+ api-grpc
+ api-protos
+ api-protos-annotations
+ public-lib-operation_id
+ lib-operation_id-protos
+ cpp-client-draft
+ cpp-client-ydb_driver
+ cpp-client-ydb_proto
+ cpp-client-ydb_table
+ cpp-client-ydb_topic
+ client-ydb_types-operation
+ client-ydb_types-status
+)
+target_sources(transfer_workload PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
+)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.cpp
new file mode 100644
index 0000000000..2ab74a7ea5
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.cpp
@@ -0,0 +1,10 @@
+#include "transfer_workload.h"
+#include "transfer_workload_topic_to_table.h"
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTransfer::TCommandWorkloadTransfer()
+ : TClientCommandTree("transfer", {}, "YDB transfer workload")
+{
+ AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTable>());
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.h
new file mode 100644
index 0000000000..226c83194f
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/common/command.h>
+
+namespace NYdb::NConsoleClient {
+
+class TCommandWorkloadTransfer : public TClientCommandTree {
+public:
+ TCommandWorkloadTransfer();
+};
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
new file mode 100644
index 0000000000..02a7d19536
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
@@ -0,0 +1,8 @@
+#include "transfer_workload_defines.h"
+
+namespace NYdb::NConsoleClient::NWorkloadTransfer {
+
+const TString TOPIC = "transfer-topic";
+const TString TABLE = "transfer-table";
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h
new file mode 100644
index 0000000000..c5df4ab60a
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <util/generic/string.h>
+
+namespace NYdb::NConsoleClient::NWorkloadTransfer {
+
+extern const TString TOPIC;
+extern const TString TABLE;
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
new file mode 100644
index 0000000000..108696ba5e
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
@@ -0,0 +1,17 @@
+#include "transfer_workload_topic_to_table.h"
+#include "transfer_workload_topic_to_table_init.h"
+#include "transfer_workload_topic_to_table_clean.h"
+#include "transfer_workload_defines.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+
+#include <util/string/printf.h>
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTransferTopicToTable::TCommandWorkloadTransferTopicToTable() :
+ TClientCommandTree("topic-to-table", {}, "Transfer from topic to table")
+{
+ AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTableInit>());
+ AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTableClean>());
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.h
new file mode 100644
index 0000000000..1910c4b08b
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+
+namespace NYdb::NConsoleClient {
+
+class TCommandWorkloadTransferTopicToTable : public TClientCommandTree {
+public:
+ TCommandWorkloadTransferTopicToTable();
+};
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
new file mode 100644
index 0000000000..fefd50eb56
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
@@ -0,0 +1,43 @@
+#include "transfer_workload_topic_to_table_clean.h"
+#include "transfer_workload_defines.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+
+using namespace NYdb::NConsoleClient;
+
+int TCommandWorkloadTransferTopicToTableClean::TScenario::DoRun(const TConfig& config)
+{
+ DropTopic(config.Database, TopicName);
+ DropTable(config.Database, TableName);
+
+ return EXIT_SUCCESS;
+}
+
+TCommandWorkloadTransferTopicToTableClean::TCommandWorkloadTransferTopicToTableClean() :
+ TWorkloadCommand("clean", {}, "Deletes objects created at the initialization stage")
+{
+}
+
+void TCommandWorkloadTransferTopicToTableClean::Config(TConfig& config)
+{
+ TYdbCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption("topic", "Topic name.")
+ .DefaultValue(NWorkloadTransfer::TOPIC)
+ .StoreResult(&Scenario.TopicName);
+ config.Opts->AddLongOption("table", "Table name.")
+ .DefaultValue(NWorkloadTransfer::TABLE)
+ .StoreResult(&Scenario.TableName);
+}
+
+void TCommandWorkloadTransferTopicToTableClean::Parse(TConfig& config)
+{
+ TClientCommand::Parse(config);
+}
+
+int TCommandWorkloadTransferTopicToTableClean::Run(TConfig& config)
+{
+ return Scenario.Run(config);
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.h
new file mode 100644
index 0000000000..55acb5848e
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h>
+
+namespace NYdb::NConsoleClient {
+
+class TCommandWorkloadTransferTopicToTableClean : public TWorkloadCommand {
+public:
+ TCommandWorkloadTransferTopicToTableClean();
+
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+private:
+ class TScenario : public TTopicOperationsScenario {
+ int DoRun(const TConfig& config) override;
+ };
+
+ TScenario Scenario;
+};
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
new file mode 100644
index 0000000000..35c52164ad
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
@@ -0,0 +1,69 @@
+#include "transfer_workload_topic_to_table_init.h"
+#include "transfer_workload_defines.h"
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+
+using namespace NYdb::NConsoleClient;
+
+int TCommandWorkloadTransferTopicToTableInit::TScenario::DoRun(const TConfig& config)
+{
+ CreateTopic(config.Database, TopicName, TopicPartitionCount, ConsumerCount);
+ CreateTable(TableName, TablePartitionCount);
+
+ return EXIT_SUCCESS;
+}
+
+void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateTable(const TString& name,
+ ui32 partitionCount)
+{
+ TStringBuilder query;
+ query << "CREATE TABLE `";
+ query << name;
+ query << "` (id Uint64, value String, PRIMARY KEY (id)) WITH (UNIFORM_PARTITIONS = ";
+ query << partitionCount;
+ query << ", AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = ";
+ query << partitionCount;
+ query << ")";
+
+ ExecSchemeQuery(query);
+}
+
+TCommandWorkloadTransferTopicToTableInit::TCommandWorkloadTransferTopicToTableInit() :
+ TWorkloadCommand("init", {}, "Creates and initializes objects")
+{
+}
+
+void TCommandWorkloadTransferTopicToTableInit::Config(TConfig& config)
+{
+ TYdbCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption("topic", "Topic name.")
+ .DefaultValue(NWorkloadTransfer::TOPIC)
+ .StoreResult(&Scenario.TopicName);
+ config.Opts->AddLongOption("table", "Table name.")
+ .DefaultValue(NWorkloadTransfer::TABLE)
+ .StoreResult(&Scenario.TableName);
+
+ config.Opts->AddLongOption("consumers", "Number of consumers in the topic.")
+ .DefaultValue(1)
+ .StoreResult(&Scenario.ConsumerCount);
+
+ config.Opts->AddLongOption("topic-partitions", "Number of partitions in the source topic.")
+ .DefaultValue(128)
+ .StoreResult(&Scenario.TopicPartitionCount);
+ config.Opts->AddLongOption("table-partitions", "Number of partitons in table.")
+ .DefaultValue(128)
+ .StoreResult(&Scenario.TablePartitionCount);
+}
+
+void TCommandWorkloadTransferTopicToTableInit::Parse(TConfig& config)
+{
+ TClientCommand::Parse(config);
+}
+
+int TCommandWorkloadTransferTopicToTableInit::Run(TConfig& config)
+{
+ return Scenario.Run(config);
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h
new file mode 100644
index 0000000000..1995406db4
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h>
+
+namespace NYdb::NConsoleClient {
+
+class TCommandWorkloadTransferTopicToTableInit : public TWorkloadCommand {
+public:
+ TCommandWorkloadTransferTopicToTableInit();
+
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+private:
+ class TScenario : public TTopicOperationsScenario {
+ int DoRun(const TConfig& config) override;
+
+ void CreateTable(const TString& table, ui32 partitionCount);
+ };
+
+ TScenario Scenario;
+};
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make b/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make
new file mode 100644
index 0000000000..6fb31b9d6f
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make
@@ -0,0 +1,28 @@
+LIBRARY(transfer_workload)
+
+SRCS(
+ transfer_workload.cpp
+ transfer_workload_topic_to_table.cpp
+ transfer_workload_topic_to_table_init.cpp
+ transfer_workload_topic_to_table_clean.cpp
+ transfer_workload_defines.cpp
+)
+
+PEERDIR(
+ ydb/library/yql/public/issue
+ ydb/library/yql/public/issue/protos
+ ydb/public/api/grpc
+ ydb/public/api/protos
+ ydb/public/api/protos/annotations
+ ydb/public/lib/operation_id
+ ydb/public/lib/operation_id/protos
+ ydb/public/sdk/cpp/client/draft
+ ydb/public/sdk/cpp/client/ydb_driver
+ ydb/public/sdk/cpp/client/ydb_proto
+ ydb/public/sdk/cpp/client/ydb_table
+ ydb/public/sdk/cpp/client/ydb_topic
+ ydb/public/sdk/cpp/client/ydb_types/operation
+ ydb/public/sdk/cpp/client/ydb_types/status
+)
+
+END()
diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make
index b44ec1cbf4..385696aa83 100644
--- a/ydb/public/lib/ydb_cli/commands/ya.make
+++ b/ydb/public/lib/ydb_cli/commands/ya.make
@@ -43,6 +43,7 @@ PEERDIR(
ydb/public/lib/operation_id
ydb/public/lib/ydb_cli/common
ydb/public/lib/ydb_cli/commands/topic_workload
+ ydb/public/lib/ydb_cli/commands/transfer_workload
ydb/public/lib/ydb_cli/dump
ydb/public/lib/ydb_cli/import
ydb/public/lib/ydb_cli/topic
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
index 7640a167d5..26c672d817 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
@@ -5,6 +5,7 @@
#include "click_bench.h"
#include "tpch.h"
#include "topic_workload/topic_workload.h"
+#include "transfer_workload/transfer_workload.h"
#include <ydb/library/workload/workload_factory.h>
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
@@ -41,6 +42,7 @@ TCommandWorkload::TCommandWorkload()
AddCommand(std::make_unique<TCommandKv>());
AddCommand(std::make_unique<TCommandClickBench>());
AddCommand(std::make_unique<TCommandWorkloadTopic>());
+ AddCommand(std::make_unique<TCommandWorkloadTransfer>());
AddCommand(std::make_unique<TCommandTpch>());
}