diff options
author | shmel1k <shmel1k@yandex-team.ru> | 2022-02-10 16:52:22 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:22 +0300 |
commit | 22b078ed897cf3921584a23b29d2e7a42449fdd2 (patch) | |
tree | c454568897edc1f9e0a7624db04c1cc3bde0085d | |
parent | 794a2d21870524f63c281c0c368edf5a6cdec02c (diff) | |
download | ydb-22b078ed897cf3921584a23b29d2e7a42449fdd2.tar.gz |
Restoring authorship annotation for <shmel1k@yandex-team.ru>. Commit 1 of 2.
-rw-r--r-- | ydb/core/ymq/version.txt | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp | 192 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h | 10 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp | 516 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_stream.h | 176 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/command.cpp | 8 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/command.h | 14 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h | 18 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp | 10 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_proto/accessor.h | 2 |
13 files changed, 477 insertions, 477 deletions
diff --git a/ydb/core/ymq/version.txt b/ydb/core/ymq/version.txt index ca4ce39a998..e8611ba5aa1 100644 --- a/ydb/core/ymq/version.txt +++ b/ydb/core/ymq/version.txt @@ -1 +1 @@ -1102889784.tags.releases.ydb.stable-21-4-37.hardening +1102889784.tags.releases.ydb.stable-21-4-37.hardening diff --git a/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp b/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp index 2cccf42a41f..13a5192f038 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp @@ -37,7 +37,7 @@ TClientCommandRootCommon::TClientCommandRootCommon(const TClientSettings& settin AddCommand(std::make_unique<TCommandConfig>()); AddCommand(std::make_unique<TCommandInit>()); AddCommand(std::make_unique<TCommandYql>()); - AddCommand(std::make_unique<TCommandStream>()); + AddCommand(std::make_unique<TCommandStream>()); AddCommand(std::make_unique<TCommandWorkload>()); } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index a6f81fe432a..1ec61b0d811 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -178,8 +178,8 @@ int TCommandDescribe::PrintPathResponse(TDriver& driver, const NScheme::TDescrib switch (entry.Type) { case NScheme::ESchemeEntryType::Table: return DescribeTable(driver); - case NScheme::ESchemeEntryType::PqGroup: - return DescribeStream(driver); + case NScheme::ESchemeEntryType::PqGroup: + return DescribeStream(driver); default: WarnAboutTableOptions(); PrintEntryVerbose(entry, ShowPermissions); @@ -187,101 +187,101 @@ int TCommandDescribe::PrintPathResponse(TDriver& driver, const NScheme::TDescrib return EXIT_SUCCESS; } -namespace { - TString FormatCodecs(const TVector<NYdb::NPersQueue::ECodec> codecs) { - if (codecs.empty()) { - return ""; - } - - TStringBuilder builder = TStringBuilder(); - for (unsigned int i = 0; i < codecs.size() - 1; ++i) { - builder << codecs[i] << ", "; - } - builder << codecs[codecs.size() - 1]; - return ToString(builder); - } - - void PrintStreamReadRules( - const TVector<NYdb::NPersQueue::TDescribeTopicResult:: - TTopicSettings::TReadRule>& readRules) { - if (readRules.empty()) { - return; - } - TPrettyTable table( - {"ConsumerName", "SupportedCodecs", - "StartingMessageTimestamp", "Important", - "ServiceType", "SupportedFormat", "Version"}); - for (const auto& rule: readRules) { - table.AddRow() - .Column(0, rule.ConsumerName()) - .Column(1, FormatCodecs(rule.SupportedCodecs())) - .Column(2, rule.StartingMessageTimestamp().ToRfc822StringLocal()) - .Column(3, rule.Important()) - .Column(4, rule.ServiceType()) - .Column(5, rule.SupportedFormat()) - .Column(6, rule.Version()); - } - Cout << Endl << "ReadRules: " << Endl; - Cout << table; - } -} - -int TCommandDescribe::PrintStreamResponsePretty(const NYdb::NPersQueue::TDescribeTopicResult::TTopicSettings &settings) { - Cout << Endl << "RetentionPeriod: " << settings.RetentionPeriod().Hours() << " hours"; - Cout << Endl << "PartitionsCount: " << settings.PartitionsCount(); - Cout << Endl << "SupportedFormat: " << settings.SupportedFormat(); - if (!settings.SupportedCodecs().empty()) { - Cout << Endl << "SupportedCodecs: " << FormatCodecs(settings.SupportedCodecs()) << Endl; - } - PrintStreamReadRules(settings.ReadRules()); - return EXIT_SUCCESS; -} - -int TCommandDescribe::PrintStreamResponseProtoJsonBase64( - const NYdb::NPersQueue:: - TDescribeTopicResult& result) { - TString json; - google::protobuf::util::JsonPrintOptions jsonOpts; - jsonOpts.preserve_proto_field_names = true; - auto convertStatus = google::protobuf::util::MessageToJsonString( - TProtoAccessor::GetProto(result), - &json, - jsonOpts - ); - if (convertStatus.ok()) { - Cout << json << Endl; - } else { - Cerr << "Error occurred while converting result proto to json" << Endl; - return EXIT_FAILURE; - } - return EXIT_SUCCESS; -} - -int TCommandDescribe::PrintStreamResponse(const NYdb::NPersQueue::TDescribeTopicResult& result) { - switch (OutputFormat) { - case EOutputFormat::Default: - case EOutputFormat::Pretty: - PrintStreamResponsePretty(result.TopicSettings()); - break; - case EOutputFormat::Json: - Cerr << "Warning! Option --json is deprecated and will be removed soon. " - << "Use \"--format proto-json-base64\" option instead." << Endl; +namespace { + TString FormatCodecs(const TVector<NYdb::NPersQueue::ECodec> codecs) { + if (codecs.empty()) { + return ""; + } + + TStringBuilder builder = TStringBuilder(); + for (unsigned int i = 0; i < codecs.size() - 1; ++i) { + builder << codecs[i] << ", "; + } + builder << codecs[codecs.size() - 1]; + return ToString(builder); + } + + void PrintStreamReadRules( + const TVector<NYdb::NPersQueue::TDescribeTopicResult:: + TTopicSettings::TReadRule>& readRules) { + if (readRules.empty()) { + return; + } + TPrettyTable table( + {"ConsumerName", "SupportedCodecs", + "StartingMessageTimestamp", "Important", + "ServiceType", "SupportedFormat", "Version"}); + for (const auto& rule: readRules) { + table.AddRow() + .Column(0, rule.ConsumerName()) + .Column(1, FormatCodecs(rule.SupportedCodecs())) + .Column(2, rule.StartingMessageTimestamp().ToRfc822StringLocal()) + .Column(3, rule.Important()) + .Column(4, rule.ServiceType()) + .Column(5, rule.SupportedFormat()) + .Column(6, rule.Version()); + } + Cout << Endl << "ReadRules: " << Endl; + Cout << table; + } +} + +int TCommandDescribe::PrintStreamResponsePretty(const NYdb::NPersQueue::TDescribeTopicResult::TTopicSettings &settings) { + Cout << Endl << "RetentionPeriod: " << settings.RetentionPeriod().Hours() << " hours"; + Cout << Endl << "PartitionsCount: " << settings.PartitionsCount(); + Cout << Endl << "SupportedFormat: " << settings.SupportedFormat(); + if (!settings.SupportedCodecs().empty()) { + Cout << Endl << "SupportedCodecs: " << FormatCodecs(settings.SupportedCodecs()) << Endl; + } + PrintStreamReadRules(settings.ReadRules()); + return EXIT_SUCCESS; +} + +int TCommandDescribe::PrintStreamResponseProtoJsonBase64( + const NYdb::NPersQueue:: + TDescribeTopicResult& result) { + TString json; + google::protobuf::util::JsonPrintOptions jsonOpts; + jsonOpts.preserve_proto_field_names = true; + auto convertStatus = google::protobuf::util::MessageToJsonString( + TProtoAccessor::GetProto(result), + &json, + jsonOpts + ); + if (convertStatus.ok()) { + Cout << json << Endl; + } else { + Cerr << "Error occurred while converting result proto to json" << Endl; + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} + +int TCommandDescribe::PrintStreamResponse(const NYdb::NPersQueue::TDescribeTopicResult& result) { + switch (OutputFormat) { + case EOutputFormat::Default: + case EOutputFormat::Pretty: + PrintStreamResponsePretty(result.TopicSettings()); + break; + case EOutputFormat::Json: + Cerr << "Warning! Option --json is deprecated and will be removed soon. " + << "Use \"--format proto-json-base64\" option instead." << Endl; [[fallthrough]]; - case EOutputFormat::ProtoJsonBase64: - return PrintStreamResponseProtoJsonBase64(result); - default: - throw TMissUseException() << "This command doesn't support " << OutputFormat << " output format"; - } - return EXIT_SUCCESS; -} - -int TCommandDescribe::DescribeStream(TDriver& driver) { - NYdb::NPersQueue::TPersQueueClient persQueueClient(driver); - auto describeResult = persQueueClient.DescribeTopic(Path).GetValueSync(); - ThrowOnError(describeResult); - return PrintStreamResponse(describeResult); -} - + case EOutputFormat::ProtoJsonBase64: + return PrintStreamResponseProtoJsonBase64(result); + default: + throw TMissUseException() << "This command doesn't support " << OutputFormat << " output format"; + } + return EXIT_SUCCESS; +} + +int TCommandDescribe::DescribeStream(TDriver& driver) { + NYdb::NPersQueue::TPersQueueClient persQueueClient(driver); + auto describeResult = persQueueClient.DescribeTopic(Path).GetValueSync(); + ThrowOnError(describeResult); + return PrintStreamResponse(describeResult); +} + int TCommandDescribe::DescribeTable(TDriver& driver) { NTable::TTableClient client(driver); NTable::TCreateSessionResult sessionResult = client.GetSession(NTable::TCreateSessionSettings()).GetValueSync(); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h index c87e4929f38..ec0a5ac68b5 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h @@ -48,11 +48,11 @@ private: int PrintResponseProtoJsonBase64(const NTable::TTableDescription& tableDescription); void WarnAboutTableOptions(); - int DescribeStream(TDriver& driver); - int PrintStreamResponse(const NYdb::NPersQueue::TDescribeTopicResult& result); - int PrintStreamResponsePretty(const NYdb::NPersQueue::TDescribeTopicResult::TTopicSettings& settings); - int PrintStreamResponseProtoJsonBase64(const NYdb::NPersQueue::TDescribeTopicResult& result); - + int DescribeStream(TDriver& driver); + int PrintStreamResponse(const NYdb::NPersQueue::TDescribeTopicResult& result); + int PrintStreamResponsePretty(const NYdb::NPersQueue::TDescribeTopicResult::TTopicSettings& settings); + int PrintStreamResponseProtoJsonBase64(const NYdb::NPersQueue::TDescribeTopicResult& result); + // Common options bool ShowPermissions = false; // Table options diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp index 24b478f8810..6c6f222fa1f 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp @@ -1,268 +1,268 @@ -#include "ydb_service_stream.h" +#include "ydb_service_stream.h" #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> #include <ydb/public/lib/ydb_cli/commands/ydb_command.h> #include <ydb/public/lib/ydb_cli/common/command.h> - -#include <util/stream/str.h> -#include <util/string/vector.h> - - -namespace NYdb::NConsoleClient { - namespace { - THashMap<NYdb::NPersQueue::ECodec, TString> CodecsDescriptions = { - {NYdb::NPersQueue::ECodec::RAW, "Raw codec. No data compression"}, - {NYdb::NPersQueue::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"}, - {NYdb::NPersQueue::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"}, - {NYdb::NPersQueue::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"}, - }; - - THashMap<TString, NYdb::NPersQueue::ECodec> ExistingCodecs = { - std::pair<TString, NYdb::NPersQueue::ECodec>("raw", NYdb::NPersQueue::ECodec::RAW), - std::pair<TString, NYdb::NPersQueue::ECodec>("gzip", NYdb::NPersQueue::ECodec::GZIP), - std::pair<TString, NYdb::NPersQueue::ECodec>("lzop", NYdb::NPersQueue::ECodec::LZOP), - std::pair<TString, NYdb::NPersQueue::ECodec>("zstd", NYdb::NPersQueue::ECodec::ZSTD), - }; - }// namespace - - void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig &config, const TVector<NPersQueue::ECodec> &supportedCodecs) { - TStringStream description; - description << "Comma-separated list of supported codecs. Available codecs: "; - NColorizer::TColors colors = NColorizer::AutoColors(Cout); - for (const auto &codec : supportedCodecs) { - auto findResult = CodecsDescriptions.find(codec); - Y_VERIFY(findResult != CodecsDescriptions.end(), - "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str()); - description << "\n " << colors.BoldColor() << codec << colors.OldColor() - << "\n " << findResult->second; - } - config.Opts->AddLongOption("supported-codecs", description.Str()) - .RequiredArgument("STRING") - .StoreResult(&SupportedCodecsStr_); - AllowedCodecs_ = supportedCodecs; - } - - void TCommandWithSupportedCodecs::ParseCodecs() { - if (SupportedCodecsStr_.empty()) { - return; - } - - TVector<NPersQueue::ECodec> parsedCodecs; - TVector<TString> split = SplitString(SupportedCodecsStr_, ","); - for (const TString &codecStr : split) { - auto exists = ExistingCodecs.find(to_lower(codecStr)); - if (exists == ExistingCodecs.end()) { - throw TMissUseException() << "Supported codec " << codecStr << " is not available for this command"; - } - - if (std::find(AllowedCodecs_.begin(), AllowedCodecs_.end(), exists->second) == AllowedCodecs_.end()) { - throw TMissUseException() << "Supported codec " << codecStr << " is not available for this command"; - } - - SupportedCodecs_.push_back(exists->second); - } - } - - TVector<NPersQueue::ECodec> TCommandWithSupportedCodecs::GetCodecs() { - return SupportedCodecs_; - } - - TCommandStream::TCommandStream() - : TClientCommandTree("stream", {}, "DataStreams service operations") { + +#include <util/stream/str.h> +#include <util/string/vector.h> + + +namespace NYdb::NConsoleClient { + namespace { + THashMap<NYdb::NPersQueue::ECodec, TString> CodecsDescriptions = { + {NYdb::NPersQueue::ECodec::RAW, "Raw codec. No data compression"}, + {NYdb::NPersQueue::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"}, + {NYdb::NPersQueue::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"}, + {NYdb::NPersQueue::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"}, + }; + + THashMap<TString, NYdb::NPersQueue::ECodec> ExistingCodecs = { + std::pair<TString, NYdb::NPersQueue::ECodec>("raw", NYdb::NPersQueue::ECodec::RAW), + std::pair<TString, NYdb::NPersQueue::ECodec>("gzip", NYdb::NPersQueue::ECodec::GZIP), + std::pair<TString, NYdb::NPersQueue::ECodec>("lzop", NYdb::NPersQueue::ECodec::LZOP), + std::pair<TString, NYdb::NPersQueue::ECodec>("zstd", NYdb::NPersQueue::ECodec::ZSTD), + }; + }// namespace + + void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig &config, const TVector<NPersQueue::ECodec> &supportedCodecs) { + TStringStream description; + description << "Comma-separated list of supported codecs. Available codecs: "; + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + for (const auto &codec : supportedCodecs) { + auto findResult = CodecsDescriptions.find(codec); + Y_VERIFY(findResult != CodecsDescriptions.end(), + "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str()); + description << "\n " << colors.BoldColor() << codec << colors.OldColor() + << "\n " << findResult->second; + } + config.Opts->AddLongOption("supported-codecs", description.Str()) + .RequiredArgument("STRING") + .StoreResult(&SupportedCodecsStr_); + AllowedCodecs_ = supportedCodecs; + } + + void TCommandWithSupportedCodecs::ParseCodecs() { + if (SupportedCodecsStr_.empty()) { + return; + } + + TVector<NPersQueue::ECodec> parsedCodecs; + TVector<TString> split = SplitString(SupportedCodecsStr_, ","); + for (const TString &codecStr : split) { + auto exists = ExistingCodecs.find(to_lower(codecStr)); + if (exists == ExistingCodecs.end()) { + throw TMissUseException() << "Supported codec " << codecStr << " is not available for this command"; + } + + if (std::find(AllowedCodecs_.begin(), AllowedCodecs_.end(), exists->second) == AllowedCodecs_.end()) { + throw TMissUseException() << "Supported codec " << codecStr << " is not available for this command"; + } + + SupportedCodecs_.push_back(exists->second); + } + } + + TVector<NPersQueue::ECodec> TCommandWithSupportedCodecs::GetCodecs() { + return SupportedCodecs_; + } + + TCommandStream::TCommandStream() + : TClientCommandTree("stream", {}, "DataStreams service operations") { AddCommand(std::make_unique<TCommandStreamCreate>()); AddCommand(std::make_unique<TCommandStreamAlter>()); AddCommand(std::make_unique<TCommandStreamDrop>()); AddCommand(std::make_unique<TCommandStreamConsumer>()); - } - - TCommandStreamCreate::TCommandStreamCreate() - : TYdbCommand("create", {}, "Create stream command") {} - - void TCommandStreamCreate::Config(TConfig &config) { - TYdbCommand::Config(config); - config.Opts->AddLongOption("partitions-count", "Total partitions count for stream") - .DefaultValue(1) - .StoreResult(&PartitionsCount_); - config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in stream is stored") - .DefaultValue(18) - .Optional() - .StoreResult(&RetentionPeriodHours_); - config.Opts->SetFreeArgsNum(1); - SetFreeArgTitle(0, "<stream-path>", "New stream name"); - AddAllowedCodecs(config, AllowedCodecs); - } - - void TCommandStreamCreate::Parse(TConfig &config) { - TYdbCommand::Parse(config); - ParseStreamName(config, 0); - ParseCodecs(); - } - - int TCommandStreamCreate::Run(TConfig &config) { - TDriver driver = CreateDriver(config); - NYdb::NPersQueue::TPersQueueClient persQueueClient(driver); - - auto settings = NYdb::NPersQueue::TCreateTopicSettings(); - settings.PartitionsCount(PartitionsCount_); - settings.MaxPartitionWriteSpeed(1024 * 1024); - settings.MaxPartitionWriteBurst(1024 * 1024); - auto codecs = GetCodecs(); - if (!codecs.empty()) { - settings.SupportedCodecs(GetCodecs()); - } - settings.RetentionPeriod(TDuration::Hours(RetentionPeriodHours_)); - - auto status = persQueueClient.CreateTopic(StreamName, settings).GetValueSync(); - ThrowOnError(status); - return EXIT_SUCCESS; - } - - TCommandStreamAlter::TCommandStreamAlter() : TYdbCommand("alter", {}, "Alter stream command") {} - - void TCommandStreamAlter::Config(TConfig &config) { - TYdbCommand::Config(config); - config.Opts->AddLongOption("partitions-count", "Total partitions count for stream") - .StoreResult(&PartitionsCount_); - config.Opts->AddLongOption("retention-period-hours", "Duration for which data in stream is stored") - .Optional() - .StoreResult(&RetentionPeriodHours_); - config.Opts->SetFreeArgsNum(1); - SetFreeArgTitle(0, "<stream-path>", "Stream to alter"); - AddAllowedCodecs(config, AllowedCodecs); - } - - void TCommandStreamAlter::Parse(TConfig &config) { - TYdbCommand::Parse(config); - ParseStreamName(config, 0); - ParseCodecs(); - } - - NYdb::NPersQueue::TAlterTopicSettings TCommandStreamAlter::PrepareAlterSettings( - NYdb::NPersQueue::TDescribeTopicResult &describeResult) { - auto settings = NYdb::NPersQueue::TAlterTopicSettings(); - settings.SetSettings(describeResult.TopicSettings()); - - if (PartitionsCount_.Defined()) { - settings.PartitionsCount(*PartitionsCount_.Get()); - } - - auto codecs = GetCodecs(); - if (!codecs.empty()) { - settings.SupportedCodecs(codecs); - } - - if (RetentionPeriodHours_.Defined()) { - settings.RetentionPeriod(TDuration::Hours(*RetentionPeriodHours_.Get())); - } - - return settings; - } - - int TCommandStreamAlter::Run(TConfig &config) { - if (!PartitionsCount_.Defined() && GetCodecs().empty() && !RetentionPeriodHours_.Defined()) { - return EXIT_SUCCESS; - } - - TDriver driver = CreateDriver(config); - NYdb::NPersQueue::TPersQueueClient persQueueClient(driver); - - auto describeResult = persQueueClient.DescribeTopic(StreamName).GetValueSync(); - ThrowOnError(describeResult); - - auto settings = PrepareAlterSettings(describeResult); - auto result = persQueueClient.AlterTopic(StreamName, settings).GetValueSync(); - ThrowOnError(result); - return EXIT_SUCCESS; - } - - TCommandStreamDrop::TCommandStreamDrop() : TYdbCommand("drop", {}, "Drop stream command") {} - - void TCommandStreamDrop::Parse(TConfig &config) { - TYdbCommand::Parse(config); - ParseStreamName(config, 0); - } - - void TCommandStreamDrop::Config(TConfig &config) { - TYdbCommand::Config(config); - config.Opts->SetFreeArgsNum(1); - SetFreeArgTitle(0, "<stream-path>", "Stream which will be dropped"); - } - - int TCommandStreamDrop::Run(TConfig &config) { - TDriver driver = CreateDriver(config); - NPersQueue::TPersQueueClient persQueueClient(driver); - - auto settings = NYdb::NPersQueue::TDropTopicSettings(); - TStatus status = persQueueClient.DropTopic(StreamName, settings).GetValueSync(); - ThrowOnError(status); - return EXIT_SUCCESS; - } - - TCommandStreamConsumer::TCommandStreamConsumer() : TClientCommandTree("consumer", {}, "Consumer operations") { + } + + TCommandStreamCreate::TCommandStreamCreate() + : TYdbCommand("create", {}, "Create stream command") {} + + void TCommandStreamCreate::Config(TConfig &config) { + TYdbCommand::Config(config); + config.Opts->AddLongOption("partitions-count", "Total partitions count for stream") + .DefaultValue(1) + .StoreResult(&PartitionsCount_); + config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in stream is stored") + .DefaultValue(18) + .Optional() + .StoreResult(&RetentionPeriodHours_); + config.Opts->SetFreeArgsNum(1); + SetFreeArgTitle(0, "<stream-path>", "New stream name"); + AddAllowedCodecs(config, AllowedCodecs); + } + + void TCommandStreamCreate::Parse(TConfig &config) { + TYdbCommand::Parse(config); + ParseStreamName(config, 0); + ParseCodecs(); + } + + int TCommandStreamCreate::Run(TConfig &config) { + TDriver driver = CreateDriver(config); + NYdb::NPersQueue::TPersQueueClient persQueueClient(driver); + + auto settings = NYdb::NPersQueue::TCreateTopicSettings(); + settings.PartitionsCount(PartitionsCount_); + settings.MaxPartitionWriteSpeed(1024 * 1024); + settings.MaxPartitionWriteBurst(1024 * 1024); + auto codecs = GetCodecs(); + if (!codecs.empty()) { + settings.SupportedCodecs(GetCodecs()); + } + settings.RetentionPeriod(TDuration::Hours(RetentionPeriodHours_)); + + auto status = persQueueClient.CreateTopic(StreamName, settings).GetValueSync(); + ThrowOnError(status); + return EXIT_SUCCESS; + } + + TCommandStreamAlter::TCommandStreamAlter() : TYdbCommand("alter", {}, "Alter stream command") {} + + void TCommandStreamAlter::Config(TConfig &config) { + TYdbCommand::Config(config); + config.Opts->AddLongOption("partitions-count", "Total partitions count for stream") + .StoreResult(&PartitionsCount_); + config.Opts->AddLongOption("retention-period-hours", "Duration for which data in stream is stored") + .Optional() + .StoreResult(&RetentionPeriodHours_); + config.Opts->SetFreeArgsNum(1); + SetFreeArgTitle(0, "<stream-path>", "Stream to alter"); + AddAllowedCodecs(config, AllowedCodecs); + } + + void TCommandStreamAlter::Parse(TConfig &config) { + TYdbCommand::Parse(config); + ParseStreamName(config, 0); + ParseCodecs(); + } + + NYdb::NPersQueue::TAlterTopicSettings TCommandStreamAlter::PrepareAlterSettings( + NYdb::NPersQueue::TDescribeTopicResult &describeResult) { + auto settings = NYdb::NPersQueue::TAlterTopicSettings(); + settings.SetSettings(describeResult.TopicSettings()); + + if (PartitionsCount_.Defined()) { + settings.PartitionsCount(*PartitionsCount_.Get()); + } + + auto codecs = GetCodecs(); + if (!codecs.empty()) { + settings.SupportedCodecs(codecs); + } + + if (RetentionPeriodHours_.Defined()) { + settings.RetentionPeriod(TDuration::Hours(*RetentionPeriodHours_.Get())); + } + + return settings; + } + + int TCommandStreamAlter::Run(TConfig &config) { + if (!PartitionsCount_.Defined() && GetCodecs().empty() && !RetentionPeriodHours_.Defined()) { + return EXIT_SUCCESS; + } + + TDriver driver = CreateDriver(config); + NYdb::NPersQueue::TPersQueueClient persQueueClient(driver); + + auto describeResult = persQueueClient.DescribeTopic(StreamName).GetValueSync(); + ThrowOnError(describeResult); + + auto settings = PrepareAlterSettings(describeResult); + auto result = persQueueClient.AlterTopic(StreamName, settings).GetValueSync(); + ThrowOnError(result); + return EXIT_SUCCESS; + } + + TCommandStreamDrop::TCommandStreamDrop() : TYdbCommand("drop", {}, "Drop stream command") {} + + void TCommandStreamDrop::Parse(TConfig &config) { + TYdbCommand::Parse(config); + ParseStreamName(config, 0); + } + + void TCommandStreamDrop::Config(TConfig &config) { + TYdbCommand::Config(config); + config.Opts->SetFreeArgsNum(1); + SetFreeArgTitle(0, "<stream-path>", "Stream which will be dropped"); + } + + int TCommandStreamDrop::Run(TConfig &config) { + TDriver driver = CreateDriver(config); + NPersQueue::TPersQueueClient persQueueClient(driver); + + auto settings = NYdb::NPersQueue::TDropTopicSettings(); + TStatus status = persQueueClient.DropTopic(StreamName, settings).GetValueSync(); + ThrowOnError(status); + return EXIT_SUCCESS; + } + + TCommandStreamConsumer::TCommandStreamConsumer() : TClientCommandTree("consumer", {}, "Consumer operations") { AddCommand(std::make_unique<TCommandStreamConsumerAdd>()); AddCommand(std::make_unique<TCommandStreamConsumerDrop>()); - } - - TCommandStreamConsumerAdd::TCommandStreamConsumerAdd() : TYdbCommand("add", {}, "Consumer add operation") {} - - void TCommandStreamConsumerAdd::Config(TConfig &config) { - TYdbCommand::Parse(config); - config.Opts->AddLongOption("consumer-name", "New consumer for stream") - .Required() - .StoreResult(&ConsumerName_); - config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed") - .Optional() - .StoreResult(&StartingMessageTimestamp_); - config.Opts->SetFreeArgsNum(1); - SetFreeArgTitle(0, "<stream-path>", "Stream for which consumer will be added"); - } - - void TCommandStreamConsumerAdd::Parse(TConfig &config) { - TYdbCommand::Parse(config); - ParseStreamName(config, 0); - } - - int TCommandStreamConsumerAdd::Run(TConfig &config) { - TDriver driver = CreateDriver(config); - NPersQueue::TPersQueueClient persQueueClient(driver); - - NYdb::NPersQueue::TReadRuleSettings readRuleSettings = NYdb::NPersQueue::TReadRuleSettings(); - readRuleSettings.ConsumerName(ConsumerName_); - if (StartingMessageTimestamp_.Defined()) { - readRuleSettings.StartingMessageTimestamp(TInstant::Seconds(*StartingMessageTimestamp_.Get())); - } - - auto addReadRuleSettings = NYdb::NPersQueue::TAddReadRuleSettings(); - addReadRuleSettings.ReadRule(readRuleSettings); - TStatus status = persQueueClient.AddReadRule(StreamName, addReadRuleSettings).GetValueSync(); - ThrowOnError(status); - return EXIT_SUCCESS; - } - - TCommandStreamConsumerDrop::TCommandStreamConsumerDrop() : TYdbCommand("drop", {}, "Consumer drop operation") {} - - void TCommandStreamConsumerDrop::Config(TConfig &config) { - TYdbCommand::Config(config); - config.Opts->AddLongOption("consumer-name", "Consumer which will be dropped") - .Required() - .StoreResult(&ConsumerName_); - config.Opts->SetFreeArgsNum(1); - SetFreeArgTitle(0, "<stream-path>", "Stream from which consumer will be dropped"); - } - - void TCommandStreamConsumerDrop::Parse(TConfig &config) { - TYdbCommand::Parse(config); - ParseStreamName(config, 0); - } - - int TCommandStreamConsumerDrop::Run(TConfig &config) { - TDriver driver = CreateDriver(config); - NPersQueue::TPersQueueClient persQueueClient(driver); - - NYdb::NPersQueue::TRemoveReadRuleSettings removeReadRuleSettings = NYdb::NPersQueue::TRemoveReadRuleSettings(); - removeReadRuleSettings.ConsumerName(ConsumerName_); - - TStatus status = persQueueClient.RemoveReadRule(StreamName, removeReadRuleSettings).GetValueSync(); - ThrowOnError(status); - return EXIT_SUCCESS; - } + } + + TCommandStreamConsumerAdd::TCommandStreamConsumerAdd() : TYdbCommand("add", {}, "Consumer add operation") {} + + void TCommandStreamConsumerAdd::Config(TConfig &config) { + TYdbCommand::Parse(config); + config.Opts->AddLongOption("consumer-name", "New consumer for stream") + .Required() + .StoreResult(&ConsumerName_); + config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed") + .Optional() + .StoreResult(&StartingMessageTimestamp_); + config.Opts->SetFreeArgsNum(1); + SetFreeArgTitle(0, "<stream-path>", "Stream for which consumer will be added"); + } + + void TCommandStreamConsumerAdd::Parse(TConfig &config) { + TYdbCommand::Parse(config); + ParseStreamName(config, 0); + } + + int TCommandStreamConsumerAdd::Run(TConfig &config) { + TDriver driver = CreateDriver(config); + NPersQueue::TPersQueueClient persQueueClient(driver); + + NYdb::NPersQueue::TReadRuleSettings readRuleSettings = NYdb::NPersQueue::TReadRuleSettings(); + readRuleSettings.ConsumerName(ConsumerName_); + if (StartingMessageTimestamp_.Defined()) { + readRuleSettings.StartingMessageTimestamp(TInstant::Seconds(*StartingMessageTimestamp_.Get())); + } + + auto addReadRuleSettings = NYdb::NPersQueue::TAddReadRuleSettings(); + addReadRuleSettings.ReadRule(readRuleSettings); + TStatus status = persQueueClient.AddReadRule(StreamName, addReadRuleSettings).GetValueSync(); + ThrowOnError(status); + return EXIT_SUCCESS; + } + + TCommandStreamConsumerDrop::TCommandStreamConsumerDrop() : TYdbCommand("drop", {}, "Consumer drop operation") {} + + void TCommandStreamConsumerDrop::Config(TConfig &config) { + TYdbCommand::Config(config); + config.Opts->AddLongOption("consumer-name", "Consumer which will be dropped") + .Required() + .StoreResult(&ConsumerName_); + config.Opts->SetFreeArgsNum(1); + SetFreeArgTitle(0, "<stream-path>", "Stream from which consumer will be dropped"); + } + + void TCommandStreamConsumerDrop::Parse(TConfig &config) { + TYdbCommand::Parse(config); + ParseStreamName(config, 0); + } + + int TCommandStreamConsumerDrop::Run(TConfig &config) { + TDriver driver = CreateDriver(config); + NPersQueue::TPersQueueClient persQueueClient(driver); + + NYdb::NPersQueue::TRemoveReadRuleSettings removeReadRuleSettings = NYdb::NPersQueue::TRemoveReadRuleSettings(); + removeReadRuleSettings.ConsumerName(ConsumerName_); + + TStatus status = persQueueClient.RemoveReadRule(StreamName, removeReadRuleSettings).GetValueSync(); + ThrowOnError(status); + return EXIT_SUCCESS; + } }// namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_stream.h b/ydb/public/lib/ydb_cli/commands/ydb_service_stream.h index f16d3b39b74..13ac9117133 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_stream.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_stream.h @@ -1,90 +1,90 @@ -#pragma once - -#include "ydb_command.h" -#include "ydb_common.h" - +#pragma once + +#include "ydb_command.h" +#include "ydb_common.h" + #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> - -namespace NYdb::NConsoleClient { - TVector<NYdb::NPersQueue::ECodec> InitAllowedCodecs(); - const TVector<NYdb::NPersQueue::ECodec> AllowedCodecs = InitAllowedCodecs(); - - class TCommandWithSupportedCodecs { - protected: - void AddAllowedCodecs(TClientCommand::TConfig &config, const TVector<NPersQueue::ECodec> &supportedCodecs); - void ParseCodecs(); - TVector<NPersQueue::ECodec> GetCodecs(); - - private: - TString SupportedCodecsStr_; - TVector<NPersQueue::ECodec> AllowedCodecs_; - TVector<NPersQueue::ECodec> SupportedCodecs_; - }; - - class TCommandStream : public TClientCommandTree { - public: - TCommandStream(); - }; - - - class TCommandStreamCreate : public TYdbCommand, public TCommandWithStreamName, public TCommandWithSupportedCodecs { - public: - TCommandStreamCreate(); - void Config(TConfig &config) override; - void Parse(TConfig &config) override; - int Run(TConfig &config) override; - - private: - ui64 RetentionPeriodHours_; - ui32 PartitionsCount_; - }; - - class TCommandStreamAlter : public TYdbCommand, public TCommandWithStreamName, public TCommandWithSupportedCodecs { - public: - TCommandStreamAlter(); - void Config(TConfig &config) override; - void Parse(TConfig &config) override; - int Run(TConfig &config) override; - - private: - TMaybe<ui64> RetentionPeriodHours_; - TMaybe<ui32> PartitionsCount_; - NYdb::NPersQueue::TAlterTopicSettings PrepareAlterSettings(NYdb::NPersQueue::TDescribeTopicResult &describeResult); - }; - - class TCommandStreamDrop : public TYdbCommand, public TCommandWithStreamName { - public: - TCommandStreamDrop(); - void Config(TConfig &config) override; - void Parse(TConfig &config) override; - int Run(TConfig &config) override; - }; - - class TCommandStreamConsumer : public TClientCommandTree { - public: - TCommandStreamConsumer(); - }; - - class TCommandStreamConsumerAdd : public TYdbCommand, public TCommandWithStreamName { - public: - TCommandStreamConsumerAdd(); - void Config(TConfig &config) override; - void Parse(TConfig &config) override; - int Run(TConfig &config) override; - - private: - TString ConsumerName_; - TMaybe<ui64> StartingMessageTimestamp_; - }; - - class TCommandStreamConsumerDrop : public TYdbCommand, public TCommandWithStreamName { - public: - TCommandStreamConsumerDrop(); - void Config(TConfig &config) override; - void Parse(TConfig &config) override; - int Run(TConfig &config) override; - - private: - TString ConsumerName_; - }; + +namespace NYdb::NConsoleClient { + TVector<NYdb::NPersQueue::ECodec> InitAllowedCodecs(); + const TVector<NYdb::NPersQueue::ECodec> AllowedCodecs = InitAllowedCodecs(); + + class TCommandWithSupportedCodecs { + protected: + void AddAllowedCodecs(TClientCommand::TConfig &config, const TVector<NPersQueue::ECodec> &supportedCodecs); + void ParseCodecs(); + TVector<NPersQueue::ECodec> GetCodecs(); + + private: + TString SupportedCodecsStr_; + TVector<NPersQueue::ECodec> AllowedCodecs_; + TVector<NPersQueue::ECodec> SupportedCodecs_; + }; + + class TCommandStream : public TClientCommandTree { + public: + TCommandStream(); + }; + + + class TCommandStreamCreate : public TYdbCommand, public TCommandWithStreamName, public TCommandWithSupportedCodecs { + public: + TCommandStreamCreate(); + void Config(TConfig &config) override; + void Parse(TConfig &config) override; + int Run(TConfig &config) override; + + private: + ui64 RetentionPeriodHours_; + ui32 PartitionsCount_; + }; + + class TCommandStreamAlter : public TYdbCommand, public TCommandWithStreamName, public TCommandWithSupportedCodecs { + public: + TCommandStreamAlter(); + void Config(TConfig &config) override; + void Parse(TConfig &config) override; + int Run(TConfig &config) override; + + private: + TMaybe<ui64> RetentionPeriodHours_; + TMaybe<ui32> PartitionsCount_; + NYdb::NPersQueue::TAlterTopicSettings PrepareAlterSettings(NYdb::NPersQueue::TDescribeTopicResult &describeResult); + }; + + class TCommandStreamDrop : public TYdbCommand, public TCommandWithStreamName { + public: + TCommandStreamDrop(); + void Config(TConfig &config) override; + void Parse(TConfig &config) override; + int Run(TConfig &config) override; + }; + + class TCommandStreamConsumer : public TClientCommandTree { + public: + TCommandStreamConsumer(); + }; + + class TCommandStreamConsumerAdd : public TYdbCommand, public TCommandWithStreamName { + public: + TCommandStreamConsumerAdd(); + void Config(TConfig &config) override; + void Parse(TConfig &config) override; + int Run(TConfig &config) override; + + private: + TString ConsumerName_; + TMaybe<ui64> StartingMessageTimestamp_; + }; + + class TCommandStreamConsumerDrop : public TYdbCommand, public TCommandWithStreamName { + public: + TCommandStreamConsumerDrop(); + void Config(TConfig &config) override; + void Parse(TConfig &config) override; + int Run(TConfig &config) override; + + private: + TString ConsumerName_; + }; }// namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/common/command.cpp b/ydb/public/lib/ydb_cli/common/command.cpp index ce7334d7388..5938c62edda 100644 --- a/ydb/public/lib/ydb_cli/common/command.cpp +++ b/ydb/public/lib/ydb_cli/common/command.cpp @@ -373,9 +373,9 @@ void TCommandWithPath::AdjustPath(const TClientCommand::TConfig& config) { NConsoleClient::AdjustPath(Path, config); } -void TCommandWithStreamName::ParseStreamName(const TClientCommand::TConfig &config, const size_t argPos) { - StreamName = config.ParseResult->GetFreeArgs()[argPos]; -} - +void TCommandWithStreamName::ParseStreamName(const TClientCommand::TConfig &config, const size_t argPos) { + StreamName = config.ParseResult->GetFreeArgs()[argPos]; } + } +} diff --git a/ydb/public/lib/ydb_cli/common/command.h b/ydb/public/lib/ydb_cli/common/command.h index 441206cdf7b..7f1287cb913 100644 --- a/ydb/public/lib/ydb_cli/common/command.h +++ b/ydb/public/lib/ydb_cli/common/command.h @@ -355,12 +355,12 @@ protected: TString Path; }; -class TCommandWithStreamName { -protected: - void ParseStreamName(const TClientCommand::TConfig& config, const size_t argPos); - - TString StreamName; -}; - +class TCommandWithStreamName { +protected: + void ParseStreamName(const TClientCommand::TConfig& config, const size_t argPos); + + TString StreamName; +}; + } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp index a22ed23cead..a9a22d46173 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp @@ -72,7 +72,7 @@ TString TCredentials::GetIamServiceAccountKey() const { TDescribeTopicResult::TDescribeTopicResult(TStatus status, const Ydb::PersQueue::V1::DescribeTopicResult& result) : TStatus(std::move(status)) , TopicSettings_(result.settings()) - , Proto_(result) + , Proto_(result) { } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index ee9aa75a0ae..10744196011 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -15,10 +15,10 @@ #include <exception> #include <variant> -namespace NYdb { - class TProtoAccessor; -} - +namespace NYdb { + class TProtoAccessor; +} + namespace NYdb::NPersQueue { enum class EFormat { @@ -57,7 +57,7 @@ private: // Result for describe resource request. struct TDescribeTopicResult : public TStatus { - friend class NYdb::TProtoAccessor; + friend class NYdb::TProtoAccessor; struct TTopicSettings { TTopicSettings(const Ydb::PersQueue::V1::TopicSettings&); @@ -158,10 +158,10 @@ struct TDescribeTopicResult : public TStatus { private: TTopicSettings TopicSettings_; - [[nodiscard]] const Ydb::PersQueue::V1::DescribeTopicResult& GetProto() const { - return Proto_; - } - const Ydb::PersQueue::V1::DescribeTopicResult Proto_; + [[nodiscard]] const Ydb::PersQueue::V1::DescribeTopicResult& GetProto() const { + return Proto_; + } + const Ydb::PersQueue::V1::DescribeTopicResult Proto_; }; using TAsyncDescribeTopicResult = NThreading::TFuture<TDescribeTopicResult>; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp index cc5ce969e77..23b4844727e 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp @@ -1,8 +1,8 @@ #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> - -namespace NYdb { - const Ydb::PersQueue::V1::DescribeTopicResult& TProtoAccessor::GetProto(const NPersQueue::TDescribeTopicResult& topicDescription) { - return topicDescription.GetProto(); - } + +namespace NYdb { + const Ydb::PersQueue::V1::DescribeTopicResult& TProtoAccessor::GetProto(const NPersQueue::TDescribeTopicResult& topicDescription) { + return topicDescription.GetProto(); + } }// namespace NYdb diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make index d19d4f6b2e6..0668c19b76c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make @@ -9,7 +9,7 @@ GENERATE_ENUM_SERIALIZATION(ydb/public/sdk/cpp/client/ydb_persqueue_core/persque SRCS( persqueue.h - proto_accessor.cpp + proto_accessor.cpp ) PEERDIR( diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h index be0165d74bb..f99bc88c82e 100644 --- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h +++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h @@ -34,7 +34,7 @@ public: static const Ydb::ResultSet& GetProto(const TResultSet& resultSet); static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats); static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription); - static const Ydb::PersQueue::V1::DescribeTopicResult& GetProto(const NYdb::NPersQueue::TDescribeTopicResult& topicDescription); + static const Ydb::PersQueue::V1::DescribeTopicResult& GetProto(const NYdb::NPersQueue::TDescribeTopicResult& topicDescription); static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats); static NTable::TTableDescription FromProto(const Ydb::Table::CreateTableRequest& request); |