diff options
author | abcdef <akotov@ydb.tech> | 2023-07-05 18:30:18 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-07-05 18:30:18 +0300 |
commit | 586b53c49dc7b17dc97b2110528f81ec4e081438 (patch) | |
tree | 3d659b2dbcc8c3c269324072c8dc9838c21dde6c | |
parent | d5fc94311c69dd2566760a0f129b91d2411c2b3c (diff) | |
download | ydb-586b53c49dc7b17dc97b2110528f81ec4e081438.tar.gz |
scripts for the `workload topic init|clean` commands
сценарии для `workload topic init|clean`
11 files changed, 225 insertions, 79 deletions
diff --git a/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt index 30da5d464da..28cd8ac960d 100644 --- a/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/apps/ydb/ut/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ target_link_libraries(ydb-apps-ydb-ut PUBLIC contrib-libs-cxxsupp yutil cpp-testing-unittest_main + cpp-client-ydb_topic ) target_link_options(ydb-apps-ydb-ut PRIVATE -ldl diff --git a/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt index 9792960b278..fa8161df2d9 100644 --- a/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/apps/ydb/ut/CMakeLists.linux-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(ydb-apps-ydb-ut PUBLIC yutil library-cpp-cpuid_check cpp-testing-unittest_main + cpp-client-ydb_topic ) target_link_options(ydb-apps-ydb-ut PRIVATE -ldl diff --git a/ydb/apps/ydb/ut/main.cpp b/ydb/apps/ydb/ut/main.cpp index 0bd3be64cbb..056561dd132 100644 --- a/ydb/apps/ydb/ut/main.cpp +++ b/ydb/apps/ydb/ut/main.cpp @@ -1,17 +1,20 @@ #include <library/cpp/testing/common/env.h> #include <library/cpp/testing/unittest/registar.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + #include <util/string/cast.h> #include <util/string/split.h> #include <util/system/env.h> #include <util/system/shellcommand.h> +Y_UNIT_TEST_SUITE(YdbWorkloadTopic) { + TString ExecYdbWorkloadTopic(TList<TString> args) { // // ydb -e grpc://${YDB_ENDPOINT} -d /${YDB_DATABASE} workload topic ${args} // - args.push_front("topic"); args.push_front("workload"); @@ -24,12 +27,36 @@ TString ExecYdbWorkloadTopic(TList<TString> args) TShellCommand command(BinaryPath("ydb/apps/ydb/ydb"), args); command.Run().Wait(); - UNIT_ASSERT_VALUES_EQUAL(command.GetExitCode(), 0); + if (command.GetExitCode() != 0) { + ythrow yexception() << "command `" << command.GetQuotedCommand() << "` exit with code " << command.GetExitCode(); + } return command.GetOutput(); } -Y_UNIT_TEST_SUITE(YdbWorkloadTopic) { +struct TTopicConfigurationMatcher { + TString Topic = "workload-topic"; + ui32 Partitions = 128; + ui32 Consumers = 1; +}; + +void ExpectTopic(const TTopicConfigurationMatcher& matcher) +{ + NYdb::TDriverConfig config; + config.SetEndpoint(GetEnv("YDB_ENDPOINT")); + config.SetDatabase(GetEnv("YDB_DATABASE")); + + NYdb::TDriver driver(config); + NYdb::NTopic::TTopicClient client(driver); + + auto result = + client.DescribeTopic(matcher.Topic).GetValueSync(); + auto& description = result.GetTopicDescription(); + + UNIT_ASSERT_VALUES_EQUAL(description.GetPartitions().size(), matcher.Partitions); + UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), matcher.Consumers); +} + Y_UNIT_TEST(RunFull) { ExecYdbWorkloadTopic({"init"}); auto output = ExecYdbWorkloadTopic({"run", "full", "-s", "10"}); @@ -45,4 +72,39 @@ Y_UNIT_TEST(RunFull) { UNIT_ASSERT_GE(fullTime, 0); UNIT_ASSERT_LT(fullTime, 10'000); } + +Y_UNIT_TEST(Init_Clean) +{ + // + // default `init` + `clean` + // + ExecYdbWorkloadTopic({"init"}); + ExpectTopic({.Topic="workload-topic", .Partitions=128, .Consumers=1}); + + ExecYdbWorkloadTopic({"clean"}); + ExpectTopic({.Topic="workload-topic", .Partitions=0, .Consumers=0}); + + // + // specific `init` + `clean` + // + ExecYdbWorkloadTopic({"init", "--topic", "qqqq", "-p", "3", "-c", "5"}); + ExpectTopic({.Topic="qqqq", .Partitions=3, .Consumers=5}); + + UNIT_ASSERT_EXCEPTION(ExecYdbWorkloadTopic({"clean"}), yexception); + + ExecYdbWorkloadTopic({"clean", "--topic", "qqqq"}); + ExpectTopic({.Topic="qqqq", .Partitions=0, .Consumers=0}); +} + +Y_UNIT_TEST(Clean_Without_Init) +{ + UNIT_ASSERT_EXCEPTION(ExecYdbWorkloadTopic({"clean"}), yexception); +} + +Y_UNIT_TEST(Double_Init) +{ + ExecYdbWorkloadTopic({"init"}); + UNIT_ASSERT_EXCEPTION(ExecYdbWorkloadTopic({"init"}), yexception); +} + } diff --git a/ydb/apps/ydb/ut/ya.make b/ydb/apps/ydb/ut/ya.make index c9ee286ae37..34238a13747 100644 --- a/ydb/apps/ydb/ut/ya.make +++ b/ydb/apps/ydb/ut/ya.make @@ -8,6 +8,10 @@ SRCS( main.cpp ) +PEERDIR( + ydb/public/sdk/cpp/client/ydb_topic +) + INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc) END() diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp index 83c64604a5f..2fe4e94b79f 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp @@ -1,8 +1,10 @@ #include "topic_readwrite_scenario.h" #include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h> +#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_describe.h> #include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h> #include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> #define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> @@ -25,8 +27,6 @@ int TTopicOperationsScenario::Run(const TConfig& config) InitDriver(config); InitStatsCollector(); - StatsCollector->PrintHeader(); - return DoRun(config); } @@ -77,6 +77,65 @@ void TTopicOperationsScenario::InitStatsCollector() ErrorFlag); } +void TTopicOperationsScenario::CreateTopic(const TString& database, + const TString& topic, + ui32 partitionCount, + ui32 consumerCount) +{ + auto topicPath = + TCommandWorkloadTopicDescribe::GenerateFullTopicName(database, topic); + + EnsureTopicNotExist(topicPath); + CreateTopic(topicPath, partitionCount, consumerCount); +} + +void TTopicOperationsScenario::DropTopic(const TString& database, + const TString& topic) +{ + Y_VERIFY(Driver); + + NTopic::TTopicClient client(*Driver); + auto topicPath = + TCommandWorkloadTopicDescribe::GenerateFullTopicName(database, topic); + + auto result = client.DropTopic(topicPath).GetValueSync(); + ThrowOnError(result); +} + +void TTopicOperationsScenario::EnsureTopicNotExist(const TString& topic) +{ + Y_VERIFY(Driver); + + NTopic::TTopicClient client(*Driver); + + auto result = client.DescribeTopic(topic, {}).GetValueSync(); + + if (result.GetTopicDescription().GetTotalPartitionsCount() != 0) { + ythrow yexception() << "Topic '" << topic << "' already exists."; + } +} + +void TTopicOperationsScenario::CreateTopic(const TString& topic, + ui32 partitionCount, + ui32 consumerCount) +{ + Y_VERIFY(Driver); + + NTopic::TTopicClient client(*Driver); + + NTopic::TCreateTopicSettings settings; + settings.PartitioningSettings(partitionCount, partitionCount); + + for (unsigned consumerIdx = 0; consumerIdx < consumerCount; ++consumerIdx) { + settings + .BeginAddConsumer(TCommandWorkloadTopicDescribe::GenerateConsumerName(consumerIdx)) + .EndAddConsumer(); + } + + auto result = client.CreateTopic(topic, settings).GetValueSync(); + ThrowOnError(result); +} + void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void>>& threads, const TString& database) { diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h index c5c92f79592..2f3c861c287 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h @@ -41,6 +41,7 @@ public: bool PrintTimestamp; double Percentile; TString TopicName; + ui32 PartitionCount = 1; ui32 ProducerThreadCount; ui32 ConsumerThreadCount; ui32 ConsumerCount; @@ -50,6 +51,13 @@ public: ui32 Codec; protected: + void CreateTopic(const TString& database, + const TString& topic, + ui32 partitionCount, + ui32 consumerCount); + void DropTopic(const TString& database, + const TString& topic); + void StartConsumerThreads(std::vector<std::future<void>>& threads, const TString& database); void StartProducerThreads(std::vector<std::future<void>>& threads, @@ -70,6 +78,11 @@ protected: private: virtual int DoRun(const TClientCommand::TConfig& config) = 0; + void EnsureTopicNotExist(const TString& topic); + void CreateTopic(const TString& topic, + ui32 partitionCount, + ui32 consumerCount); + static THolder<TLogBackend> MakeLogBackend(TClientCommand::TConfig::EVerbosityLevel level); void InitLog(const TClientCommand::TConfig& config); diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp index b0bb2ecf1e9..bb651554eec 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.cpp @@ -9,32 +9,36 @@ using namespace NYdb::NConsoleClient; +int TCommandWorkloadTopicClean::TScenario::DoRun(const TConfig& config) +{ + TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver); + + DropTopic(config.Database, TopicName); + + return EXIT_SUCCESS; +} + TCommandWorkloadTopicClean::TCommandWorkloadTopicClean() : TWorkloadCommand("clean", {}, "drop topic created in init phase") { } -void TCommandWorkloadTopicClean::Config(TConfig& config) { +void TCommandWorkloadTopicClean::Config(TConfig& config) +{ TYdbCommand::Config(config); config.SetFreeArgsNum(0); config.Opts->AddLongOption("topic", "Topic name.") .DefaultValue(TOPIC) - .StoreResult(&TopicName); + .StoreResult(&Scenario.TopicName); } -void TCommandWorkloadTopicClean::Parse(TConfig& config) { +void TCommandWorkloadTopicClean::Parse(TConfig& config) +{ TClientCommand::Parse(config); } -int TCommandWorkloadTopicClean::Run(TConfig& config) { - Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config)); - auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*Driver); - - TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver); - - TString fullTopicName = TCommandWorkloadTopicDescribe::GenerateFullTopicName(config.Database, TopicName); - auto result = topicClient->DropTopic(fullTopicName).GetValueSync(); - ThrowOnError(result); - return EXIT_SUCCESS; -}
\ No newline at end of file +int TCommandWorkloadTopicClean::Run(TConfig& config) +{ + return Scenario.Run(config); +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h index 3540d4eee2c..7ded1ed6494 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_clean.h @@ -1,22 +1,29 @@ #pragma once #include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> +#include <ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h> + +namespace NYdb::NConsoleClient { + +class TCommandWorkloadTopicClean: public TWorkloadCommand { +public: + TCommandWorkloadTopicClean(); + + void Config(TConfig& config) override; + void Parse(TConfig& config) override; + int Run(TConfig& config) override; + +private: + class TScenario : public TTopicOperationsScenario { + int DoRun(const TConfig& config) override; + }; + + TScenario Scenario; +}; + +class TCommandWorkloadTopicRun: public TClientCommandTree { +public: + TCommandWorkloadTopicRun(); +}; -namespace NYdb { - namespace NConsoleClient { - class TCommandWorkloadTopicClean: public TWorkloadCommand { - public: - TCommandWorkloadTopicClean(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - private: - TString TopicName; - }; - - class TCommandWorkloadTopicRun: public TClientCommandTree { - public: - TCommandWorkloadTopicRun(); - }; - } } diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp index 800867e7b62..bab243b2dfc 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp @@ -8,54 +8,42 @@ using namespace NYdb::NConsoleClient; +int TCommandWorkloadTopicInit::TScenario::DoRun(const TConfig& config) +{ + CreateTopic(config.Database, TopicName, PartitionCount, ConsumerCount); + + return EXIT_SUCCESS; +} + TCommandWorkloadTopicInit::TCommandWorkloadTopicInit() : TWorkloadCommand("init", {}, "Create and initialize topic for workload") - , PartitionCount(1) { } -void TCommandWorkloadTopicInit::Config(TConfig& config) { +void TCommandWorkloadTopicInit::Config(TConfig& config) +{ TYdbCommand::Config(config); config.SetFreeArgsNum(0); config.Opts->AddLongOption("topic", "Topic name.") .DefaultValue(TOPIC) - .StoreResult(&TopicName); + .StoreResult(&Scenario.TopicName); config.Opts->AddLongOption('p', "partitions", "Number of partitions in the topic.") .DefaultValue(128) - .StoreResult(&PartitionCount); + .StoreResult(&Scenario.PartitionCount); config.Opts->AddLongOption('c', "consumers", "Number of consumers in the topic.") .DefaultValue(1) - .StoreResult(&ConsumerCount); + .StoreResult(&Scenario.ConsumerCount); } -void TCommandWorkloadTopicInit::Parse(TConfig& config) { +void TCommandWorkloadTopicInit::Parse(TConfig& config) +{ TClientCommand::Parse(config); } -int TCommandWorkloadTopicInit::Run(TConfig& config) { - Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config)); - auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(*Driver); - - auto fullTopicName = TCommandWorkloadTopicDescribe::GenerateFullTopicName(config.Database, TopicName); - auto describeTopicResult = topicClient->DescribeTopic(fullTopicName, {}).GetValueSync(); - if (describeTopicResult.GetTopicDescription().GetTotalPartitionsCount() != 0) { - Cout << "Topic " << TopicName << " already exists.\n"; - return EXIT_FAILURE; - } - - NYdb::NTopic::TCreateTopicSettings settings; - settings.PartitioningSettings(PartitionCount, PartitionCount); - - for (ui32 consumerIdx = 0; consumerIdx < ConsumerCount; ++consumerIdx) { - settings.BeginAddConsumer(TCommandWorkloadTopicDescribe::GenerateConsumerName(consumerIdx)) - .EndAddConsumer(); - } - - auto result = topicClient->CreateTopic(fullTopicName, settings).GetValueSync(); - ThrowOnError(result); - - return EXIT_SUCCESS; +int TCommandWorkloadTopicInit::Run(TConfig& config) +{ + return Scenario.Run(config); } diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h index 391b59b48f3..fd055a34acb 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.h @@ -1,21 +1,24 @@ #pragma once #include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> +#include <ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h> -namespace NYdb { - namespace NConsoleClient { - class TCommandWorkloadTopicInit: public TWorkloadCommand { - public: - TCommandWorkloadTopicInit(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; +namespace NYdb::NConsoleClient { - private: - TString TopicName; +class TCommandWorkloadTopicInit : public TWorkloadCommand { +public: + TCommandWorkloadTopicInit(); + + void Config(TConfig& config) override; + void Parse(TConfig& config) override; + int Run(TConfig& config) override; + +private: + class TScenario : public TTopicOperationsScenario { + int DoRun(const TConfig& config) override; + }; + + TScenario Scenario; +}; - ui32 PartitionCount; - ui32 ConsumerCount; - }; - } } diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp index 22e6d618edb..c5c30c46021 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp @@ -62,9 +62,12 @@ void TTopicWorkloadStatsCollector::PrintHeader(bool total) const { } void TTopicWorkloadStatsCollector::PrintWindowStatsLoop() { + PrintHeader(); + auto startTime = Now(); WarmupTime = startTime + TDuration::Seconds(WarmupSec); auto stopTime = startTime + TDuration::Seconds(TotalSec + 1); + int windowIt = 1; auto windowDuration = TDuration::Seconds(WindowSec); while (Now() < stopTime && !*ErrorFlag) { @@ -77,6 +80,7 @@ void TTopicWorkloadStatsCollector::PrintWindowStatsLoop() { } Sleep(std::max(TDuration::Zero(), Now() - windowTime(windowIt))); } + CollectThreadEvents(); } |