aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-07-05 18:30:18 +0300
committerabcdef <akotov@ydb.tech>2023-07-05 18:30:18 +0300
commit586b53c49dc7b17dc97b2110528f81ec4e081438 (patch)
tree3d659b2dbcc8c3c269324072c8dc9838c21dde6c
parentd5fc94311c69dd2566760a0f129b91d2411c2b3c (diff)
downloadydb-586b53c49dc7b17dc97b2110528f81ec4e081438.tar.gz
scripts for the `workload topic init|clean` commands
сценарии для `workload topic init|clean`
-rw-r--r--ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/apps/ydb/ut/main.cpp68
-rw-r--r--ydb/apps/ydb/ut/ya.make4
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp63
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h13
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp32
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h41
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp46
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h31
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp4
11 files changed, 225 insertions, 79 deletions
diff --git a/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt
index 30da5d464da..28cd8ac960d 100644
--- a/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ target_link_libraries(ydb-apps-ydb-ut PUBLIC
contrib-libs-cxxsupp
yutil
cpp-testing-unittest_main
+ cpp-client-ydb_topic
)
target_link_options(ydb-apps-ydb-ut PRIVATE
-ldl
diff --git a/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt
index 9792960b278..fa8161df2d9 100644
--- a/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt
@@ -14,6 +14,7 @@ target_link_libraries(ydb-apps-ydb-ut PUBLIC
yutil
library-cpp-cpuid_check
cpp-testing-unittest_main
+ cpp-client-ydb_topic
)
target_link_options(ydb-apps-ydb-ut PRIVATE
-ldl
diff --git a/ydb/apps/ydb/ut/main.cpp b/ydb/apps/ydb/ut/main.cpp
index 0bd3be64cbb..056561dd132 100644
--- a/ydb/apps/ydb/ut/main.cpp
+++ b/ydb/apps/ydb/ut/main.cpp
@@ -1,17 +1,20 @@
#include <library/cpp/testing/common/env.h>
#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
#include <util/string/cast.h>
#include <util/string/split.h>
#include <util/system/env.h>
#include <util/system/shellcommand.h>
+Y_UNIT_TEST_SUITE(YdbWorkloadTopic) {
+
TString ExecYdbWorkloadTopic(TList<TString> args)
{
//
// ydb -e grpc://${YDB_ENDPOINT} -d /${YDB_DATABASE} workload topic ${args}
//
-
args.push_front("topic");
args.push_front("workload");
@@ -24,12 +27,36 @@ TString ExecYdbWorkloadTopic(TList<TString> args)
TShellCommand command(BinaryPath("ydb/apps/ydb/ydb"), args);
command.Run().Wait();
- UNIT_ASSERT_VALUES_EQUAL(command.GetExitCode(), 0);
+ if (command.GetExitCode() != 0) {
+ ythrow yexception() << "command `" << command.GetQuotedCommand() << "` exit with code " << command.GetExitCode();
+ }
return command.GetOutput();
}
-Y_UNIT_TEST_SUITE(YdbWorkloadTopic) {
+struct TTopicConfigurationMatcher {
+ TString Topic = "workload-topic";
+ ui32 Partitions = 128;
+ ui32 Consumers = 1;
+};
+
+void ExpectTopic(const TTopicConfigurationMatcher& matcher)
+{
+ NYdb::TDriverConfig config;
+ config.SetEndpoint(GetEnv("YDB_ENDPOINT"));
+ config.SetDatabase(GetEnv("YDB_DATABASE"));
+
+ NYdb::TDriver driver(config);
+ NYdb::NTopic::TTopicClient client(driver);
+
+ auto result =
+ client.DescribeTopic(matcher.Topic).GetValueSync();
+ auto& description = result.GetTopicDescription();
+
+ UNIT_ASSERT_VALUES_EQUAL(description.GetPartitions().size(), matcher.Partitions);
+ UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), matcher.Consumers);
+}
+
Y_UNIT_TEST(RunFull) {
ExecYdbWorkloadTopic({"init"});
auto output = ExecYdbWorkloadTopic({"run", "full", "-s", "10"});
@@ -45,4 +72,39 @@ Y_UNIT_TEST(RunFull) {
UNIT_ASSERT_GE(fullTime, 0);
UNIT_ASSERT_LT(fullTime, 10'000);
}
+
+Y_UNIT_TEST(Init_Clean)
+{
+ //
+ // default `init` + `clean`
+ //
+ ExecYdbWorkloadTopic({"init"});
+ ExpectTopic({.Topic="workload-topic", .Partitions=128, .Consumers=1});
+
+ ExecYdbWorkloadTopic({"clean"});
+ ExpectTopic({.Topic="workload-topic", .Partitions=0, .Consumers=0});
+
+ //
+ // specific `init` + `clean`
+ //
+ ExecYdbWorkloadTopic({"init", "--topic", "qqqq", "-p", "3", "-c", "5"});
+ ExpectTopic({.Topic="qqqq", .Partitions=3, .Consumers=5});
+
+ UNIT_ASSERT_EXCEPTION(ExecYdbWorkloadTopic({"clean"}), yexception);
+
+ ExecYdbWorkloadTopic({"clean", "--topic", "qqqq"});
+ ExpectTopic({.Topic="qqqq", .Partitions=0, .Consumers=0});
+}
+
+Y_UNIT_TEST(Clean_Without_Init)
+{
+ UNIT_ASSERT_EXCEPTION(ExecYdbWorkloadTopic({"clean"}), yexception);
+}
+
+Y_UNIT_TEST(Double_Init)
+{
+ ExecYdbWorkloadTopic({"init"});
+ UNIT_ASSERT_EXCEPTION(ExecYdbWorkloadTopic({"init"}), yexception);
+}
+
}
diff --git a/ydb/apps/ydb/ut/ya.make b/ydb/apps/ydb/ut/ya.make
index c9ee286ae37..34238a13747 100644
--- a/ydb/apps/ydb/ut/ya.make
+++ b/ydb/apps/ydb/ut/ya.make
@@ -8,6 +8,10 @@ SRCS(
main.cpp
)
+PEERDIR(
+ ydb/public/sdk/cpp/client/ydb_topic
+)
+
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
END()
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
index 83c64604a5f..2fe4e94b79f 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
@@ -1,8 +1,10 @@
#include "topic_readwrite_scenario.h"
#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_describe.h>
#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h>
#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
#define INCLUDE_YDB_INTERNAL_H
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h>
@@ -25,8 +27,6 @@ int TTopicOperationsScenario::Run(const TConfig& config)
InitDriver(config);
InitStatsCollector();
- StatsCollector->PrintHeader();
-
return DoRun(config);
}
@@ -77,6 +77,65 @@ void TTopicOperationsScenario::InitStatsCollector()
ErrorFlag);
}
+void TTopicOperationsScenario::CreateTopic(const TString& database,
+ const TString& topic,
+ ui32 partitionCount,
+ ui32 consumerCount)
+{
+ auto topicPath =
+ TCommandWorkloadTopicDescribe::GenerateFullTopicName(database, topic);
+
+ EnsureTopicNotExist(topicPath);
+ CreateTopic(topicPath, partitionCount, consumerCount);
+}
+
+void TTopicOperationsScenario::DropTopic(const TString& database,
+ const TString& topic)
+{
+ Y_VERIFY(Driver);
+
+ NTopic::TTopicClient client(*Driver);
+ auto topicPath =
+ TCommandWorkloadTopicDescribe::GenerateFullTopicName(database, topic);
+
+ auto result = client.DropTopic(topicPath).GetValueSync();
+ ThrowOnError(result);
+}
+
+void TTopicOperationsScenario::EnsureTopicNotExist(const TString& topic)
+{
+ Y_VERIFY(Driver);
+
+ NTopic::TTopicClient client(*Driver);
+
+ auto result = client.DescribeTopic(topic, {}).GetValueSync();
+
+ if (result.GetTopicDescription().GetTotalPartitionsCount() != 0) {
+ ythrow yexception() << "Topic '" << topic << "' already exists.";
+ }
+}
+
+void TTopicOperationsScenario::CreateTopic(const TString& topic,
+ ui32 partitionCount,
+ ui32 consumerCount)
+{
+ Y_VERIFY(Driver);
+
+ NTopic::TTopicClient client(*Driver);
+
+ NTopic::TCreateTopicSettings settings;
+ settings.PartitioningSettings(partitionCount, partitionCount);
+
+ for (unsigned consumerIdx = 0; consumerIdx < consumerCount; ++consumerIdx) {
+ settings
+ .BeginAddConsumer(TCommandWorkloadTopicDescribe::GenerateConsumerName(consumerIdx))
+ .EndAddConsumer();
+ }
+
+ auto result = client.CreateTopic(topic, settings).GetValueSync();
+ ThrowOnError(result);
+}
+
void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void>>& threads,
const TString& database)
{
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
index c5c92f79592..2f3c861c287 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
@@ -41,6 +41,7 @@ public:
bool PrintTimestamp;
double Percentile;
TString TopicName;
+ ui32 PartitionCount = 1;
ui32 ProducerThreadCount;
ui32 ConsumerThreadCount;
ui32 ConsumerCount;
@@ -50,6 +51,13 @@ public:
ui32 Codec;
protected:
+ void CreateTopic(const TString& database,
+ const TString& topic,
+ ui32 partitionCount,
+ ui32 consumerCount);
+ void DropTopic(const TString& database,
+ const TString& topic);
+
void StartConsumerThreads(std::vector<std::future<void>>& threads,
const TString& database);
void StartProducerThreads(std::vector<std::future<void>>& threads,
@@ -70,6 +78,11 @@ protected:
private:
virtual int DoRun(const TClientCommand::TConfig& config) = 0;
+ void EnsureTopicNotExist(const TString& topic);
+ void CreateTopic(const TString& topic,
+ ui32 partitionCount,
+ ui32 consumerCount);
+
static THolder<TLogBackend> MakeLogBackend(TClientCommand::TConfig::EVerbosityLevel level);
void InitLog(const TClientCommand::TConfig& config);
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
index b0bb2ecf1e9..bb651554eec 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp
@@ -9,32 +9,36 @@
using namespace NYdb::NConsoleClient;
+int TCommandWorkloadTopicClean::TScenario::DoRun(const TConfig& config)
+{
+ TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver);
+
+ DropTopic(config.Database, TopicName);
+
+ return EXIT_SUCCESS;
+}
+
TCommandWorkloadTopicClean::TCommandWorkloadTopicClean()
: TWorkloadCommand("clean", {}, "drop topic created in init phase")
{
}
-void TCommandWorkloadTopicClean::Config(TConfig& config) {
+void TCommandWorkloadTopicClean::Config(TConfig& config)
+{
TYdbCommand::Config(config);
config.SetFreeArgsNum(0);
config.Opts->AddLongOption("topic", "Topic name.")
.DefaultValue(TOPIC)
- .StoreResult(&TopicName);
+ .StoreResult(&Scenario.TopicName);
}
-void TCommandWorkloadTopicClean::Parse(TConfig& config) {
+void TCommandWorkloadTopicClean::Parse(TConfig& config)
+{
TClientCommand::Parse(config);
}
-int TCommandWorkloadTopicClean::Run(TConfig& config) {
- Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config));
- auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*Driver);
-
- TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver);
-
- TString fullTopicName = TCommandWorkloadTopicDescribe::GenerateFullTopicName(config.Database, TopicName);
- auto result = topicClient->DropTopic(fullTopicName).GetValueSync();
- ThrowOnError(result);
- return EXIT_SUCCESS;
-} \ No newline at end of file
+int TCommandWorkloadTopicClean::Run(TConfig& config)
+{
+ return Scenario.Run(config);
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h
index 3540d4eee2c..7ded1ed6494 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h
@@ -1,22 +1,29 @@
#pragma once
#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h>
+
+namespace NYdb::NConsoleClient {
+
+class TCommandWorkloadTopicClean: public TWorkloadCommand {
+public:
+ TCommandWorkloadTopicClean();
+
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+private:
+ class TScenario : public TTopicOperationsScenario {
+ int DoRun(const TConfig& config) override;
+ };
+
+ TScenario Scenario;
+};
+
+class TCommandWorkloadTopicRun: public TClientCommandTree {
+public:
+ TCommandWorkloadTopicRun();
+};
-namespace NYdb {
- namespace NConsoleClient {
- class TCommandWorkloadTopicClean: public TWorkloadCommand {
- public:
- TCommandWorkloadTopicClean();
- virtual void Config(TConfig& config) override;
- virtual void Parse(TConfig& config) override;
- virtual int Run(TConfig& config) override;
- private:
- TString TopicName;
- };
-
- class TCommandWorkloadTopicRun: public TClientCommandTree {
- public:
- TCommandWorkloadTopicRun();
- };
- }
}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
index 800867e7b62..bab243b2dfc 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp
@@ -8,54 +8,42 @@
using namespace NYdb::NConsoleClient;
+int TCommandWorkloadTopicInit::TScenario::DoRun(const TConfig& config)
+{
+ CreateTopic(config.Database, TopicName, PartitionCount, ConsumerCount);
+
+ return EXIT_SUCCESS;
+}
+
TCommandWorkloadTopicInit::TCommandWorkloadTopicInit()
: TWorkloadCommand("init", {}, "Create and initialize topic for workload")
- , PartitionCount(1)
{
}
-void TCommandWorkloadTopicInit::Config(TConfig& config) {
+void TCommandWorkloadTopicInit::Config(TConfig& config)
+{
TYdbCommand::Config(config);
config.SetFreeArgsNum(0);
config.Opts->AddLongOption("topic", "Topic name.")
.DefaultValue(TOPIC)
- .StoreResult(&TopicName);
+ .StoreResult(&Scenario.TopicName);
config.Opts->AddLongOption('p', "partitions", "Number of partitions in the topic.")
.DefaultValue(128)
- .StoreResult(&PartitionCount);
+ .StoreResult(&Scenario.PartitionCount);
config.Opts->AddLongOption('c', "consumers", "Number of consumers in the topic.")
.DefaultValue(1)
- .StoreResult(&ConsumerCount);
+ .StoreResult(&Scenario.ConsumerCount);
}
-void TCommandWorkloadTopicInit::Parse(TConfig& config) {
+void TCommandWorkloadTopicInit::Parse(TConfig& config)
+{
TClientCommand::Parse(config);
}
-int TCommandWorkloadTopicInit::Run(TConfig& config) {
- Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config));
- auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*Driver);
-
- auto fullTopicName = TCommandWorkloadTopicDescribe::GenerateFullTopicName(config.Database, TopicName);
- auto describeTopicResult = topicClient->DescribeTopic(fullTopicName, {}).GetValueSync();
- if (describeTopicResult.GetTopicDescription().GetTotalPartitionsCount() != 0) {
- Cout << "Topic " << TopicName << " already exists.\n";
- return EXIT_FAILURE;
- }
-
- NYdb::NTopic::TCreateTopicSettings settings;
- settings.PartitioningSettings(PartitionCount, PartitionCount);
-
- for (ui32 consumerIdx = 0; consumerIdx < ConsumerCount; ++consumerIdx) {
- settings.BeginAddConsumer(TCommandWorkloadTopicDescribe::GenerateConsumerName(consumerIdx))
- .EndAddConsumer();
- }
-
- auto result = topicClient->CreateTopic(fullTopicName, settings).GetValueSync();
- ThrowOnError(result);
-
- return EXIT_SUCCESS;
+int TCommandWorkloadTopicInit::Run(TConfig& config)
+{
+ return Scenario.Run(config);
}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h
index 391b59b48f3..fd055a34acb 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h
@@ -1,21 +1,24 @@
#pragma once
#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h>
-namespace NYdb {
- namespace NConsoleClient {
- class TCommandWorkloadTopicInit: public TWorkloadCommand {
- public:
- TCommandWorkloadTopicInit();
- virtual void Config(TConfig& config) override;
- virtual void Parse(TConfig& config) override;
- virtual int Run(TConfig& config) override;
+namespace NYdb::NConsoleClient {
- private:
- TString TopicName;
+class TCommandWorkloadTopicInit : public TWorkloadCommand {
+public:
+ TCommandWorkloadTopicInit();
+
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+private:
+ class TScenario : public TTopicOperationsScenario {
+ int DoRun(const TConfig& config) override;
+ };
+
+ TScenario Scenario;
+};
- ui32 PartitionCount;
- ui32 ConsumerCount;
- };
- }
}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
index 22e6d618edb..c5c30c46021 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
@@ -62,9 +62,12 @@ void TTopicWorkloadStatsCollector::PrintHeader(bool total) const {
}
void TTopicWorkloadStatsCollector::PrintWindowStatsLoop() {
+ PrintHeader();
+
auto startTime = Now();
WarmupTime = startTime + TDuration::Seconds(WarmupSec);
auto stopTime = startTime + TDuration::Seconds(TotalSec + 1);
+
int windowIt = 1;
auto windowDuration = TDuration::Seconds(WindowSec);
while (Now() < stopTime && !*ErrorFlag) {
@@ -77,6 +80,7 @@ void TTopicWorkloadStatsCollector::PrintWindowStatsLoop() {
}
Sleep(std::max(TDuration::Zero(), Now() - windowTime(windowIt)));
}
+
CollectThreadEvents();
}