aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@yandex-team.ru>2022-02-10 16:52:22 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:22 +0300
commit22b078ed897cf3921584a23b29d2e7a42449fdd2 (patch)
treec454568897edc1f9e0a7624db04c1cc3bde0085d
parent794a2d21870524f63c281c0c368edf5a6cdec02c (diff)
downloadydb-22b078ed897cf3921584a23b29d2e7a42449fdd2.tar.gz
Restoring authorship annotation for <shmel1k@yandex-team.ru>. Commit 1 of 2.
-rw-r--r--ydb/core/ymq/version.txt2
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp2
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp192
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h10
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp516
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_stream.h176
-rw-r--r--ydb/public/lib/ydb_cli/common/command.cpp8
-rw-r--r--ydb/public/lib/ydb_cli/common/command.h14
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h18
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_proto/accessor.h2
13 files changed, 477 insertions, 477 deletions
diff --git a/ydb/core/ymq/version.txt b/ydb/core/ymq/version.txt
index ca4ce39a998..e8611ba5aa1 100644
--- a/ydb/core/ymq/version.txt
+++ b/ydb/core/ymq/version.txt
@@ -1 +1 @@
-1102889784.tags.releases.ydb.stable-21-4-37.hardening
+1102889784.tags.releases.ydb.stable-21-4-37.hardening
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp b/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp
index 2cccf42a41f..13a5192f038 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp
@@ -37,7 +37,7 @@ TClientCommandRootCommon::TClientCommandRootCommon(const TClientSettings& settin
AddCommand(std::make_unique<TCommandConfig>());
AddCommand(std::make_unique<TCommandInit>());
AddCommand(std::make_unique<TCommandYql>());
- AddCommand(std::make_unique<TCommandStream>());
+ AddCommand(std::make_unique<TCommandStream>());
AddCommand(std::make_unique<TCommandWorkload>());
}
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
index a6f81fe432a..1ec61b0d811 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
@@ -178,8 +178,8 @@ int TCommandDescribe::PrintPathResponse(TDriver& driver, const NScheme::TDescrib
switch (entry.Type) {
case NScheme::ESchemeEntryType::Table:
return DescribeTable(driver);
- case NScheme::ESchemeEntryType::PqGroup:
- return DescribeStream(driver);
+ case NScheme::ESchemeEntryType::PqGroup:
+ return DescribeStream(driver);
default:
WarnAboutTableOptions();
PrintEntryVerbose(entry, ShowPermissions);
@@ -187,101 +187,101 @@ int TCommandDescribe::PrintPathResponse(TDriver& driver, const NScheme::TDescrib
return EXIT_SUCCESS;
}
-namespace {
- TString FormatCodecs(const TVector<NYdb::NPersQueue::ECodec> codecs) {
- if (codecs.empty()) {
- return "";
- }
-
- TStringBuilder builder = TStringBuilder();
- for (unsigned int i = 0; i < codecs.size() - 1; ++i) {
- builder << codecs[i] << ", ";
- }
- builder << codecs[codecs.size() - 1];
- return ToString(builder);
- }
-
- void PrintStreamReadRules(
- const TVector<NYdb::NPersQueue::TDescribeTopicResult::
- TTopicSettings::TReadRule>& readRules) {
- if (readRules.empty()) {
- return;
- }
- TPrettyTable table(
- {"ConsumerName", "SupportedCodecs",
- "StartingMessageTimestamp", "Important",
- "ServiceType", "SupportedFormat", "Version"});
- for (const auto& rule: readRules) {
- table.AddRow()
- .Column(0, rule.ConsumerName())
- .Column(1, FormatCodecs(rule.SupportedCodecs()))
- .Column(2, rule.StartingMessageTimestamp().ToRfc822StringLocal())
- .Column(3, rule.Important())
- .Column(4, rule.ServiceType())
- .Column(5, rule.SupportedFormat())
- .Column(6, rule.Version());
- }
- Cout << Endl << "ReadRules: " << Endl;
- Cout << table;
- }
-}
-
-int TCommandDescribe::PrintStreamResponsePretty(const NYdb::NPersQueue::TDescribeTopicResult::TTopicSettings &settings) {
- Cout << Endl << "RetentionPeriod: " << settings.RetentionPeriod().Hours() << " hours";
- Cout << Endl << "PartitionsCount: " << settings.PartitionsCount();
- Cout << Endl << "SupportedFormat: " << settings.SupportedFormat();
- if (!settings.SupportedCodecs().empty()) {
- Cout << Endl << "SupportedCodecs: " << FormatCodecs(settings.SupportedCodecs()) << Endl;
- }
- PrintStreamReadRules(settings.ReadRules());
- return EXIT_SUCCESS;
-}
-
-int TCommandDescribe::PrintStreamResponseProtoJsonBase64(
- const NYdb::NPersQueue::
- TDescribeTopicResult& result) {
- TString json;
- google::protobuf::util::JsonPrintOptions jsonOpts;
- jsonOpts.preserve_proto_field_names = true;
- auto convertStatus = google::protobuf::util::MessageToJsonString(
- TProtoAccessor::GetProto(result),
- &json,
- jsonOpts
- );
- if (convertStatus.ok()) {
- Cout << json << Endl;
- } else {
- Cerr << "Error occurred while converting result proto to json" << Endl;
- return EXIT_FAILURE;
- }
- return EXIT_SUCCESS;
-}
-
-int TCommandDescribe::PrintStreamResponse(const NYdb::NPersQueue::TDescribeTopicResult& result) {
- switch (OutputFormat) {
- case EOutputFormat::Default:
- case EOutputFormat::Pretty:
- PrintStreamResponsePretty(result.TopicSettings());
- break;
- case EOutputFormat::Json:
- Cerr << "Warning! Option --json is deprecated and will be removed soon. "
- << "Use \"--format proto-json-base64\" option instead." << Endl;
+namespace {
+ TString FormatCodecs(const TVector<NYdb::NPersQueue::ECodec> codecs) {
+ if (codecs.empty()) {
+ return "";
+ }
+
+ TStringBuilder builder = TStringBuilder();
+ for (unsigned int i = 0; i < codecs.size() - 1; ++i) {
+ builder << codecs[i] << ", ";
+ }
+ builder << codecs[codecs.size() - 1];
+ return ToString(builder);
+ }
+
+ void PrintStreamReadRules(
+ const TVector<NYdb::NPersQueue::TDescribeTopicResult::
+ TTopicSettings::TReadRule>& readRules) {
+ if (readRules.empty()) {
+ return;
+ }
+ TPrettyTable table(
+ {"ConsumerName", "SupportedCodecs",
+ "StartingMessageTimestamp", "Important",
+ "ServiceType", "SupportedFormat", "Version"});
+ for (const auto& rule: readRules) {
+ table.AddRow()
+ .Column(0, rule.ConsumerName())
+ .Column(1, FormatCodecs(rule.SupportedCodecs()))
+ .Column(2, rule.StartingMessageTimestamp().ToRfc822StringLocal())
+ .Column(3, rule.Important())
+ .Column(4, rule.ServiceType())
+ .Column(5, rule.SupportedFormat())
+ .Column(6, rule.Version());
+ }
+ Cout << Endl << "ReadRules: " << Endl;
+ Cout << table;
+ }
+}
+
+int TCommandDescribe::PrintStreamResponsePretty(const NYdb::NPersQueue::TDescribeTopicResult::TTopicSettings &settings) {
+ Cout << Endl << "RetentionPeriod: " << settings.RetentionPeriod().Hours() << " hours";
+ Cout << Endl << "PartitionsCount: " << settings.PartitionsCount();
+ Cout << Endl << "SupportedFormat: " << settings.SupportedFormat();
+ if (!settings.SupportedCodecs().empty()) {
+ Cout << Endl << "SupportedCodecs: " << FormatCodecs(settings.SupportedCodecs()) << Endl;
+ }
+ PrintStreamReadRules(settings.ReadRules());
+ return EXIT_SUCCESS;
+}
+
+int TCommandDescribe::PrintStreamResponseProtoJsonBase64(
+ const NYdb::NPersQueue::
+ TDescribeTopicResult& result) {
+ TString json;
+ google::protobuf::util::JsonPrintOptions jsonOpts;
+ jsonOpts.preserve_proto_field_names = true;
+ auto convertStatus = google::protobuf::util::MessageToJsonString(
+ TProtoAccessor::GetProto(result),
+ &json,
+ jsonOpts
+ );
+ if (convertStatus.ok()) {
+ Cout << json << Endl;
+ } else {
+ Cerr << "Error occurred while converting result proto to json" << Endl;
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
+
+int TCommandDescribe::PrintStreamResponse(const NYdb::NPersQueue::TDescribeTopicResult& result) {
+ switch (OutputFormat) {
+ case EOutputFormat::Default:
+ case EOutputFormat::Pretty:
+ PrintStreamResponsePretty(result.TopicSettings());
+ break;
+ case EOutputFormat::Json:
+ Cerr << "Warning! Option --json is deprecated and will be removed soon. "
+ << "Use \"--format proto-json-base64\" option instead." << Endl;
[[fallthrough]];
- case EOutputFormat::ProtoJsonBase64:
- return PrintStreamResponseProtoJsonBase64(result);
- default:
- throw TMissUseException() << "This command doesn't support " << OutputFormat << " output format";
- }
- return EXIT_SUCCESS;
-}
-
-int TCommandDescribe::DescribeStream(TDriver& driver) {
- NYdb::NPersQueue::TPersQueueClient persQueueClient(driver);
- auto describeResult = persQueueClient.DescribeTopic(Path).GetValueSync();
- ThrowOnError(describeResult);
- return PrintStreamResponse(describeResult);
-}
-
+ case EOutputFormat::ProtoJsonBase64:
+ return PrintStreamResponseProtoJsonBase64(result);
+ default:
+ throw TMissUseException() << "This command doesn't support " << OutputFormat << " output format";
+ }
+ return EXIT_SUCCESS;
+}
+
+int TCommandDescribe::DescribeStream(TDriver& driver) {
+ NYdb::NPersQueue::TPersQueueClient persQueueClient(driver);
+ auto describeResult = persQueueClient.DescribeTopic(Path).GetValueSync();
+ ThrowOnError(describeResult);
+ return PrintStreamResponse(describeResult);
+}
+
int TCommandDescribe::DescribeTable(TDriver& driver) {
NTable::TTableClient client(driver);
NTable::TCreateSessionResult sessionResult = client.GetSession(NTable::TCreateSessionSettings()).GetValueSync();
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h
index c87e4929f38..ec0a5ac68b5 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h
@@ -48,11 +48,11 @@ private:
int PrintResponseProtoJsonBase64(const NTable::TTableDescription& tableDescription);
void WarnAboutTableOptions();
- int DescribeStream(TDriver& driver);
- int PrintStreamResponse(const NYdb::NPersQueue::TDescribeTopicResult& result);
- int PrintStreamResponsePretty(const NYdb::NPersQueue::TDescribeTopicResult::TTopicSettings& settings);
- int PrintStreamResponseProtoJsonBase64(const NYdb::NPersQueue::TDescribeTopicResult& result);
-
+ int DescribeStream(TDriver& driver);
+ int PrintStreamResponse(const NYdb::NPersQueue::TDescribeTopicResult& result);
+ int PrintStreamResponsePretty(const NYdb::NPersQueue::TDescribeTopicResult::TTopicSettings& settings);
+ int PrintStreamResponseProtoJsonBase64(const NYdb::NPersQueue::TDescribeTopicResult& result);
+
// Common options
bool ShowPermissions = false;
// Table options
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp
index 24b478f8810..6c6f222fa1f 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_stream.cpp
@@ -1,268 +1,268 @@
-#include "ydb_service_stream.h"
+#include "ydb_service_stream.h"
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
#include <ydb/public/lib/ydb_cli/commands/ydb_command.h>
#include <ydb/public/lib/ydb_cli/common/command.h>
-
-#include <util/stream/str.h>
-#include <util/string/vector.h>
-
-
-namespace NYdb::NConsoleClient {
- namespace {
- THashMap<NYdb::NPersQueue::ECodec, TString> CodecsDescriptions = {
- {NYdb::NPersQueue::ECodec::RAW, "Raw codec. No data compression"},
- {NYdb::NPersQueue::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"},
- {NYdb::NPersQueue::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"},
- {NYdb::NPersQueue::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"},
- };
-
- THashMap<TString, NYdb::NPersQueue::ECodec> ExistingCodecs = {
- std::pair<TString, NYdb::NPersQueue::ECodec>("raw", NYdb::NPersQueue::ECodec::RAW),
- std::pair<TString, NYdb::NPersQueue::ECodec>("gzip", NYdb::NPersQueue::ECodec::GZIP),
- std::pair<TString, NYdb::NPersQueue::ECodec>("lzop", NYdb::NPersQueue::ECodec::LZOP),
- std::pair<TString, NYdb::NPersQueue::ECodec>("zstd", NYdb::NPersQueue::ECodec::ZSTD),
- };
- }// namespace
-
- void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig &config, const TVector<NPersQueue::ECodec> &supportedCodecs) {
- TStringStream description;
- description << "Comma-separated list of supported codecs. Available codecs: ";
- NColorizer::TColors colors = NColorizer::AutoColors(Cout);
- for (const auto &codec : supportedCodecs) {
- auto findResult = CodecsDescriptions.find(codec);
- Y_VERIFY(findResult != CodecsDescriptions.end(),
- "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str());
- description << "\n " << colors.BoldColor() << codec << colors.OldColor()
- << "\n " << findResult->second;
- }
- config.Opts->AddLongOption("supported-codecs", description.Str())
- .RequiredArgument("STRING")
- .StoreResult(&SupportedCodecsStr_);
- AllowedCodecs_ = supportedCodecs;
- }
-
- void TCommandWithSupportedCodecs::ParseCodecs() {
- if (SupportedCodecsStr_.empty()) {
- return;
- }
-
- TVector<NPersQueue::ECodec> parsedCodecs;
- TVector<TString> split = SplitString(SupportedCodecsStr_, ",");
- for (const TString &codecStr : split) {
- auto exists = ExistingCodecs.find(to_lower(codecStr));
- if (exists == ExistingCodecs.end()) {
- throw TMissUseException() << "Supported codec " << codecStr << " is not available for this command";
- }
-
- if (std::find(AllowedCodecs_.begin(), AllowedCodecs_.end(), exists->second) == AllowedCodecs_.end()) {
- throw TMissUseException() << "Supported codec " << codecStr << " is not available for this command";
- }
-
- SupportedCodecs_.push_back(exists->second);
- }
- }
-
- TVector<NPersQueue::ECodec> TCommandWithSupportedCodecs::GetCodecs() {
- return SupportedCodecs_;
- }
-
- TCommandStream::TCommandStream()
- : TClientCommandTree("stream", {}, "DataStreams service operations") {
+
+#include <util/stream/str.h>
+#include <util/string/vector.h>
+
+
+namespace NYdb::NConsoleClient {
+ namespace {
+ THashMap<NYdb::NPersQueue::ECodec, TString> CodecsDescriptions = {
+ {NYdb::NPersQueue::ECodec::RAW, "Raw codec. No data compression"},
+ {NYdb::NPersQueue::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"},
+ {NYdb::NPersQueue::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"},
+ {NYdb::NPersQueue::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"},
+ };
+
+ THashMap<TString, NYdb::NPersQueue::ECodec> ExistingCodecs = {
+ std::pair<TString, NYdb::NPersQueue::ECodec>("raw", NYdb::NPersQueue::ECodec::RAW),
+ std::pair<TString, NYdb::NPersQueue::ECodec>("gzip", NYdb::NPersQueue::ECodec::GZIP),
+ std::pair<TString, NYdb::NPersQueue::ECodec>("lzop", NYdb::NPersQueue::ECodec::LZOP),
+ std::pair<TString, NYdb::NPersQueue::ECodec>("zstd", NYdb::NPersQueue::ECodec::ZSTD),
+ };
+ }// namespace
+
+ void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig &config, const TVector<NPersQueue::ECodec> &supportedCodecs) {
+ TStringStream description;
+ description << "Comma-separated list of supported codecs. Available codecs: ";
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ for (const auto &codec : supportedCodecs) {
+ auto findResult = CodecsDescriptions.find(codec);
+ Y_VERIFY(findResult != CodecsDescriptions.end(),
+ "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str());
+ description << "\n " << colors.BoldColor() << codec << colors.OldColor()
+ << "\n " << findResult->second;
+ }
+ config.Opts->AddLongOption("supported-codecs", description.Str())
+ .RequiredArgument("STRING")
+ .StoreResult(&SupportedCodecsStr_);
+ AllowedCodecs_ = supportedCodecs;
+ }
+
+ void TCommandWithSupportedCodecs::ParseCodecs() {
+ if (SupportedCodecsStr_.empty()) {
+ return;
+ }
+
+ TVector<NPersQueue::ECodec> parsedCodecs;
+ TVector<TString> split = SplitString(SupportedCodecsStr_, ",");
+ for (const TString &codecStr : split) {
+ auto exists = ExistingCodecs.find(to_lower(codecStr));
+ if (exists == ExistingCodecs.end()) {
+ throw TMissUseException() << "Supported codec " << codecStr << " is not available for this command";
+ }
+
+ if (std::find(AllowedCodecs_.begin(), AllowedCodecs_.end(), exists->second) == AllowedCodecs_.end()) {
+ throw TMissUseException() << "Supported codec " << codecStr << " is not available for this command";
+ }
+
+ SupportedCodecs_.push_back(exists->second);
+ }
+ }
+
+ TVector<NPersQueue::ECodec> TCommandWithSupportedCodecs::GetCodecs() {
+ return SupportedCodecs_;
+ }
+
+ TCommandStream::TCommandStream()
+ : TClientCommandTree("stream", {}, "DataStreams service operations") {
AddCommand(std::make_unique<TCommandStreamCreate>());
AddCommand(std::make_unique<TCommandStreamAlter>());
AddCommand(std::make_unique<TCommandStreamDrop>());
AddCommand(std::make_unique<TCommandStreamConsumer>());
- }
-
- TCommandStreamCreate::TCommandStreamCreate()
- : TYdbCommand("create", {}, "Create stream command") {}
-
- void TCommandStreamCreate::Config(TConfig &config) {
- TYdbCommand::Config(config);
- config.Opts->AddLongOption("partitions-count", "Total partitions count for stream")
- .DefaultValue(1)
- .StoreResult(&PartitionsCount_);
- config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in stream is stored")
- .DefaultValue(18)
- .Optional()
- .StoreResult(&RetentionPeriodHours_);
- config.Opts->SetFreeArgsNum(1);
- SetFreeArgTitle(0, "<stream-path>", "New stream name");
- AddAllowedCodecs(config, AllowedCodecs);
- }
-
- void TCommandStreamCreate::Parse(TConfig &config) {
- TYdbCommand::Parse(config);
- ParseStreamName(config, 0);
- ParseCodecs();
- }
-
- int TCommandStreamCreate::Run(TConfig &config) {
- TDriver driver = CreateDriver(config);
- NYdb::NPersQueue::TPersQueueClient persQueueClient(driver);
-
- auto settings = NYdb::NPersQueue::TCreateTopicSettings();
- settings.PartitionsCount(PartitionsCount_);
- settings.MaxPartitionWriteSpeed(1024 * 1024);
- settings.MaxPartitionWriteBurst(1024 * 1024);
- auto codecs = GetCodecs();
- if (!codecs.empty()) {
- settings.SupportedCodecs(GetCodecs());
- }
- settings.RetentionPeriod(TDuration::Hours(RetentionPeriodHours_));
-
- auto status = persQueueClient.CreateTopic(StreamName, settings).GetValueSync();
- ThrowOnError(status);
- return EXIT_SUCCESS;
- }
-
- TCommandStreamAlter::TCommandStreamAlter() : TYdbCommand("alter", {}, "Alter stream command") {}
-
- void TCommandStreamAlter::Config(TConfig &config) {
- TYdbCommand::Config(config);
- config.Opts->AddLongOption("partitions-count", "Total partitions count for stream")
- .StoreResult(&PartitionsCount_);
- config.Opts->AddLongOption("retention-period-hours", "Duration for which data in stream is stored")
- .Optional()
- .StoreResult(&RetentionPeriodHours_);
- config.Opts->SetFreeArgsNum(1);
- SetFreeArgTitle(0, "<stream-path>", "Stream to alter");
- AddAllowedCodecs(config, AllowedCodecs);
- }
-
- void TCommandStreamAlter::Parse(TConfig &config) {
- TYdbCommand::Parse(config);
- ParseStreamName(config, 0);
- ParseCodecs();
- }
-
- NYdb::NPersQueue::TAlterTopicSettings TCommandStreamAlter::PrepareAlterSettings(
- NYdb::NPersQueue::TDescribeTopicResult &describeResult) {
- auto settings = NYdb::NPersQueue::TAlterTopicSettings();
- settings.SetSettings(describeResult.TopicSettings());
-
- if (PartitionsCount_.Defined()) {
- settings.PartitionsCount(*PartitionsCount_.Get());
- }
-
- auto codecs = GetCodecs();
- if (!codecs.empty()) {
- settings.SupportedCodecs(codecs);
- }
-
- if (RetentionPeriodHours_.Defined()) {
- settings.RetentionPeriod(TDuration::Hours(*RetentionPeriodHours_.Get()));
- }
-
- return settings;
- }
-
- int TCommandStreamAlter::Run(TConfig &config) {
- if (!PartitionsCount_.Defined() && GetCodecs().empty() && !RetentionPeriodHours_.Defined()) {
- return EXIT_SUCCESS;
- }
-
- TDriver driver = CreateDriver(config);
- NYdb::NPersQueue::TPersQueueClient persQueueClient(driver);
-
- auto describeResult = persQueueClient.DescribeTopic(StreamName).GetValueSync();
- ThrowOnError(describeResult);
-
- auto settings = PrepareAlterSettings(describeResult);
- auto result = persQueueClient.AlterTopic(StreamName, settings).GetValueSync();
- ThrowOnError(result);
- return EXIT_SUCCESS;
- }
-
- TCommandStreamDrop::TCommandStreamDrop() : TYdbCommand("drop", {}, "Drop stream command") {}
-
- void TCommandStreamDrop::Parse(TConfig &config) {
- TYdbCommand::Parse(config);
- ParseStreamName(config, 0);
- }
-
- void TCommandStreamDrop::Config(TConfig &config) {
- TYdbCommand::Config(config);
- config.Opts->SetFreeArgsNum(1);
- SetFreeArgTitle(0, "<stream-path>", "Stream which will be dropped");
- }
-
- int TCommandStreamDrop::Run(TConfig &config) {
- TDriver driver = CreateDriver(config);
- NPersQueue::TPersQueueClient persQueueClient(driver);
-
- auto settings = NYdb::NPersQueue::TDropTopicSettings();
- TStatus status = persQueueClient.DropTopic(StreamName, settings).GetValueSync();
- ThrowOnError(status);
- return EXIT_SUCCESS;
- }
-
- TCommandStreamConsumer::TCommandStreamConsumer() : TClientCommandTree("consumer", {}, "Consumer operations") {
+ }
+
+ TCommandStreamCreate::TCommandStreamCreate()
+ : TYdbCommand("create", {}, "Create stream command") {}
+
+ void TCommandStreamCreate::Config(TConfig &config) {
+ TYdbCommand::Config(config);
+ config.Opts->AddLongOption("partitions-count", "Total partitions count for stream")
+ .DefaultValue(1)
+ .StoreResult(&PartitionsCount_);
+ config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in stream is stored")
+ .DefaultValue(18)
+ .Optional()
+ .StoreResult(&RetentionPeriodHours_);
+ config.Opts->SetFreeArgsNum(1);
+ SetFreeArgTitle(0, "<stream-path>", "New stream name");
+ AddAllowedCodecs(config, AllowedCodecs);
+ }
+
+ void TCommandStreamCreate::Parse(TConfig &config) {
+ TYdbCommand::Parse(config);
+ ParseStreamName(config, 0);
+ ParseCodecs();
+ }
+
+ int TCommandStreamCreate::Run(TConfig &config) {
+ TDriver driver = CreateDriver(config);
+ NYdb::NPersQueue::TPersQueueClient persQueueClient(driver);
+
+ auto settings = NYdb::NPersQueue::TCreateTopicSettings();
+ settings.PartitionsCount(PartitionsCount_);
+ settings.MaxPartitionWriteSpeed(1024 * 1024);
+ settings.MaxPartitionWriteBurst(1024 * 1024);
+ auto codecs = GetCodecs();
+ if (!codecs.empty()) {
+ settings.SupportedCodecs(GetCodecs());
+ }
+ settings.RetentionPeriod(TDuration::Hours(RetentionPeriodHours_));
+
+ auto status = persQueueClient.CreateTopic(StreamName, settings).GetValueSync();
+ ThrowOnError(status);
+ return EXIT_SUCCESS;
+ }
+
+ TCommandStreamAlter::TCommandStreamAlter() : TYdbCommand("alter", {}, "Alter stream command") {}
+
+ void TCommandStreamAlter::Config(TConfig &config) {
+ TYdbCommand::Config(config);
+ config.Opts->AddLongOption("partitions-count", "Total partitions count for stream")
+ .StoreResult(&PartitionsCount_);
+ config.Opts->AddLongOption("retention-period-hours", "Duration for which data in stream is stored")
+ .Optional()
+ .StoreResult(&RetentionPeriodHours_);
+ config.Opts->SetFreeArgsNum(1);
+ SetFreeArgTitle(0, "<stream-path>", "Stream to alter");
+ AddAllowedCodecs(config, AllowedCodecs);
+ }
+
+ void TCommandStreamAlter::Parse(TConfig &config) {
+ TYdbCommand::Parse(config);
+ ParseStreamName(config, 0);
+ ParseCodecs();
+ }
+
+ NYdb::NPersQueue::TAlterTopicSettings TCommandStreamAlter::PrepareAlterSettings(
+ NYdb::NPersQueue::TDescribeTopicResult &describeResult) {
+ auto settings = NYdb::NPersQueue::TAlterTopicSettings();
+ settings.SetSettings(describeResult.TopicSettings());
+
+ if (PartitionsCount_.Defined()) {
+ settings.PartitionsCount(*PartitionsCount_.Get());
+ }
+
+ auto codecs = GetCodecs();
+ if (!codecs.empty()) {
+ settings.SupportedCodecs(codecs);
+ }
+
+ if (RetentionPeriodHours_.Defined()) {
+ settings.RetentionPeriod(TDuration::Hours(*RetentionPeriodHours_.Get()));
+ }
+
+ return settings;
+ }
+
+ int TCommandStreamAlter::Run(TConfig &config) {
+ if (!PartitionsCount_.Defined() && GetCodecs().empty() && !RetentionPeriodHours_.Defined()) {
+ return EXIT_SUCCESS;
+ }
+
+ TDriver driver = CreateDriver(config);
+ NYdb::NPersQueue::TPersQueueClient persQueueClient(driver);
+
+ auto describeResult = persQueueClient.DescribeTopic(StreamName).GetValueSync();
+ ThrowOnError(describeResult);
+
+ auto settings = PrepareAlterSettings(describeResult);
+ auto result = persQueueClient.AlterTopic(StreamName, settings).GetValueSync();
+ ThrowOnError(result);
+ return EXIT_SUCCESS;
+ }
+
+ TCommandStreamDrop::TCommandStreamDrop() : TYdbCommand("drop", {}, "Drop stream command") {}
+
+ void TCommandStreamDrop::Parse(TConfig &config) {
+ TYdbCommand::Parse(config);
+ ParseStreamName(config, 0);
+ }
+
+ void TCommandStreamDrop::Config(TConfig &config) {
+ TYdbCommand::Config(config);
+ config.Opts->SetFreeArgsNum(1);
+ SetFreeArgTitle(0, "<stream-path>", "Stream which will be dropped");
+ }
+
+ int TCommandStreamDrop::Run(TConfig &config) {
+ TDriver driver = CreateDriver(config);
+ NPersQueue::TPersQueueClient persQueueClient(driver);
+
+ auto settings = NYdb::NPersQueue::TDropTopicSettings();
+ TStatus status = persQueueClient.DropTopic(StreamName, settings).GetValueSync();
+ ThrowOnError(status);
+ return EXIT_SUCCESS;
+ }
+
+ TCommandStreamConsumer::TCommandStreamConsumer() : TClientCommandTree("consumer", {}, "Consumer operations") {
AddCommand(std::make_unique<TCommandStreamConsumerAdd>());
AddCommand(std::make_unique<TCommandStreamConsumerDrop>());
- }
-
- TCommandStreamConsumerAdd::TCommandStreamConsumerAdd() : TYdbCommand("add", {}, "Consumer add operation") {}
-
- void TCommandStreamConsumerAdd::Config(TConfig &config) {
- TYdbCommand::Parse(config);
- config.Opts->AddLongOption("consumer-name", "New consumer for stream")
- .Required()
- .StoreResult(&ConsumerName_);
- config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed")
- .Optional()
- .StoreResult(&StartingMessageTimestamp_);
- config.Opts->SetFreeArgsNum(1);
- SetFreeArgTitle(0, "<stream-path>", "Stream for which consumer will be added");
- }
-
- void TCommandStreamConsumerAdd::Parse(TConfig &config) {
- TYdbCommand::Parse(config);
- ParseStreamName(config, 0);
- }
-
- int TCommandStreamConsumerAdd::Run(TConfig &config) {
- TDriver driver = CreateDriver(config);
- NPersQueue::TPersQueueClient persQueueClient(driver);
-
- NYdb::NPersQueue::TReadRuleSettings readRuleSettings = NYdb::NPersQueue::TReadRuleSettings();
- readRuleSettings.ConsumerName(ConsumerName_);
- if (StartingMessageTimestamp_.Defined()) {
- readRuleSettings.StartingMessageTimestamp(TInstant::Seconds(*StartingMessageTimestamp_.Get()));
- }
-
- auto addReadRuleSettings = NYdb::NPersQueue::TAddReadRuleSettings();
- addReadRuleSettings.ReadRule(readRuleSettings);
- TStatus status = persQueueClient.AddReadRule(StreamName, addReadRuleSettings).GetValueSync();
- ThrowOnError(status);
- return EXIT_SUCCESS;
- }
-
- TCommandStreamConsumerDrop::TCommandStreamConsumerDrop() : TYdbCommand("drop", {}, "Consumer drop operation") {}
-
- void TCommandStreamConsumerDrop::Config(TConfig &config) {
- TYdbCommand::Config(config);
- config.Opts->AddLongOption("consumer-name", "Consumer which will be dropped")
- .Required()
- .StoreResult(&ConsumerName_);
- config.Opts->SetFreeArgsNum(1);
- SetFreeArgTitle(0, "<stream-path>", "Stream from which consumer will be dropped");
- }
-
- void TCommandStreamConsumerDrop::Parse(TConfig &config) {
- TYdbCommand::Parse(config);
- ParseStreamName(config, 0);
- }
-
- int TCommandStreamConsumerDrop::Run(TConfig &config) {
- TDriver driver = CreateDriver(config);
- NPersQueue::TPersQueueClient persQueueClient(driver);
-
- NYdb::NPersQueue::TRemoveReadRuleSettings removeReadRuleSettings = NYdb::NPersQueue::TRemoveReadRuleSettings();
- removeReadRuleSettings.ConsumerName(ConsumerName_);
-
- TStatus status = persQueueClient.RemoveReadRule(StreamName, removeReadRuleSettings).GetValueSync();
- ThrowOnError(status);
- return EXIT_SUCCESS;
- }
+ }
+
+ TCommandStreamConsumerAdd::TCommandStreamConsumerAdd() : TYdbCommand("add", {}, "Consumer add operation") {}
+
+ void TCommandStreamConsumerAdd::Config(TConfig &config) {
+ TYdbCommand::Parse(config);
+ config.Opts->AddLongOption("consumer-name", "New consumer for stream")
+ .Required()
+ .StoreResult(&ConsumerName_);
+ config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed")
+ .Optional()
+ .StoreResult(&StartingMessageTimestamp_);
+ config.Opts->SetFreeArgsNum(1);
+ SetFreeArgTitle(0, "<stream-path>", "Stream for which consumer will be added");
+ }
+
+ void TCommandStreamConsumerAdd::Parse(TConfig &config) {
+ TYdbCommand::Parse(config);
+ ParseStreamName(config, 0);
+ }
+
+ int TCommandStreamConsumerAdd::Run(TConfig &config) {
+ TDriver driver = CreateDriver(config);
+ NPersQueue::TPersQueueClient persQueueClient(driver);
+
+ NYdb::NPersQueue::TReadRuleSettings readRuleSettings = NYdb::NPersQueue::TReadRuleSettings();
+ readRuleSettings.ConsumerName(ConsumerName_);
+ if (StartingMessageTimestamp_.Defined()) {
+ readRuleSettings.StartingMessageTimestamp(TInstant::Seconds(*StartingMessageTimestamp_.Get()));
+ }
+
+ auto addReadRuleSettings = NYdb::NPersQueue::TAddReadRuleSettings();
+ addReadRuleSettings.ReadRule(readRuleSettings);
+ TStatus status = persQueueClient.AddReadRule(StreamName, addReadRuleSettings).GetValueSync();
+ ThrowOnError(status);
+ return EXIT_SUCCESS;
+ }
+
+ TCommandStreamConsumerDrop::TCommandStreamConsumerDrop() : TYdbCommand("drop", {}, "Consumer drop operation") {}
+
+ void TCommandStreamConsumerDrop::Config(TConfig &config) {
+ TYdbCommand::Config(config);
+ config.Opts->AddLongOption("consumer-name", "Consumer which will be dropped")
+ .Required()
+ .StoreResult(&ConsumerName_);
+ config.Opts->SetFreeArgsNum(1);
+ SetFreeArgTitle(0, "<stream-path>", "Stream from which consumer will be dropped");
+ }
+
+ void TCommandStreamConsumerDrop::Parse(TConfig &config) {
+ TYdbCommand::Parse(config);
+ ParseStreamName(config, 0);
+ }
+
+ int TCommandStreamConsumerDrop::Run(TConfig &config) {
+ TDriver driver = CreateDriver(config);
+ NPersQueue::TPersQueueClient persQueueClient(driver);
+
+ NYdb::NPersQueue::TRemoveReadRuleSettings removeReadRuleSettings = NYdb::NPersQueue::TRemoveReadRuleSettings();
+ removeReadRuleSettings.ConsumerName(ConsumerName_);
+
+ TStatus status = persQueueClient.RemoveReadRule(StreamName, removeReadRuleSettings).GetValueSync();
+ ThrowOnError(status);
+ return EXIT_SUCCESS;
+ }
}// namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_stream.h b/ydb/public/lib/ydb_cli/commands/ydb_service_stream.h
index f16d3b39b74..13ac9117133 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_stream.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_stream.h
@@ -1,90 +1,90 @@
-#pragma once
-
-#include "ydb_command.h"
-#include "ydb_common.h"
-
+#pragma once
+
+#include "ydb_command.h"
+#include "ydb_common.h"
+
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
-
-namespace NYdb::NConsoleClient {
- TVector<NYdb::NPersQueue::ECodec> InitAllowedCodecs();
- const TVector<NYdb::NPersQueue::ECodec> AllowedCodecs = InitAllowedCodecs();
-
- class TCommandWithSupportedCodecs {
- protected:
- void AddAllowedCodecs(TClientCommand::TConfig &config, const TVector<NPersQueue::ECodec> &supportedCodecs);
- void ParseCodecs();
- TVector<NPersQueue::ECodec> GetCodecs();
-
- private:
- TString SupportedCodecsStr_;
- TVector<NPersQueue::ECodec> AllowedCodecs_;
- TVector<NPersQueue::ECodec> SupportedCodecs_;
- };
-
- class TCommandStream : public TClientCommandTree {
- public:
- TCommandStream();
- };
-
-
- class TCommandStreamCreate : public TYdbCommand, public TCommandWithStreamName, public TCommandWithSupportedCodecs {
- public:
- TCommandStreamCreate();
- void Config(TConfig &config) override;
- void Parse(TConfig &config) override;
- int Run(TConfig &config) override;
-
- private:
- ui64 RetentionPeriodHours_;
- ui32 PartitionsCount_;
- };
-
- class TCommandStreamAlter : public TYdbCommand, public TCommandWithStreamName, public TCommandWithSupportedCodecs {
- public:
- TCommandStreamAlter();
- void Config(TConfig &config) override;
- void Parse(TConfig &config) override;
- int Run(TConfig &config) override;
-
- private:
- TMaybe<ui64> RetentionPeriodHours_;
- TMaybe<ui32> PartitionsCount_;
- NYdb::NPersQueue::TAlterTopicSettings PrepareAlterSettings(NYdb::NPersQueue::TDescribeTopicResult &describeResult);
- };
-
- class TCommandStreamDrop : public TYdbCommand, public TCommandWithStreamName {
- public:
- TCommandStreamDrop();
- void Config(TConfig &config) override;
- void Parse(TConfig &config) override;
- int Run(TConfig &config) override;
- };
-
- class TCommandStreamConsumer : public TClientCommandTree {
- public:
- TCommandStreamConsumer();
- };
-
- class TCommandStreamConsumerAdd : public TYdbCommand, public TCommandWithStreamName {
- public:
- TCommandStreamConsumerAdd();
- void Config(TConfig &config) override;
- void Parse(TConfig &config) override;
- int Run(TConfig &config) override;
-
- private:
- TString ConsumerName_;
- TMaybe<ui64> StartingMessageTimestamp_;
- };
-
- class TCommandStreamConsumerDrop : public TYdbCommand, public TCommandWithStreamName {
- public:
- TCommandStreamConsumerDrop();
- void Config(TConfig &config) override;
- void Parse(TConfig &config) override;
- int Run(TConfig &config) override;
-
- private:
- TString ConsumerName_;
- };
+
+namespace NYdb::NConsoleClient {
+ TVector<NYdb::NPersQueue::ECodec> InitAllowedCodecs();
+ const TVector<NYdb::NPersQueue::ECodec> AllowedCodecs = InitAllowedCodecs();
+
+ class TCommandWithSupportedCodecs {
+ protected:
+ void AddAllowedCodecs(TClientCommand::TConfig &config, const TVector<NPersQueue::ECodec> &supportedCodecs);
+ void ParseCodecs();
+ TVector<NPersQueue::ECodec> GetCodecs();
+
+ private:
+ TString SupportedCodecsStr_;
+ TVector<NPersQueue::ECodec> AllowedCodecs_;
+ TVector<NPersQueue::ECodec> SupportedCodecs_;
+ };
+
+ class TCommandStream : public TClientCommandTree {
+ public:
+ TCommandStream();
+ };
+
+
+ class TCommandStreamCreate : public TYdbCommand, public TCommandWithStreamName, public TCommandWithSupportedCodecs {
+ public:
+ TCommandStreamCreate();
+ void Config(TConfig &config) override;
+ void Parse(TConfig &config) override;
+ int Run(TConfig &config) override;
+
+ private:
+ ui64 RetentionPeriodHours_;
+ ui32 PartitionsCount_;
+ };
+
+ class TCommandStreamAlter : public TYdbCommand, public TCommandWithStreamName, public TCommandWithSupportedCodecs {
+ public:
+ TCommandStreamAlter();
+ void Config(TConfig &config) override;
+ void Parse(TConfig &config) override;
+ int Run(TConfig &config) override;
+
+ private:
+ TMaybe<ui64> RetentionPeriodHours_;
+ TMaybe<ui32> PartitionsCount_;
+ NYdb::NPersQueue::TAlterTopicSettings PrepareAlterSettings(NYdb::NPersQueue::TDescribeTopicResult &describeResult);
+ };
+
+ class TCommandStreamDrop : public TYdbCommand, public TCommandWithStreamName {
+ public:
+ TCommandStreamDrop();
+ void Config(TConfig &config) override;
+ void Parse(TConfig &config) override;
+ int Run(TConfig &config) override;
+ };
+
+ class TCommandStreamConsumer : public TClientCommandTree {
+ public:
+ TCommandStreamConsumer();
+ };
+
+ class TCommandStreamConsumerAdd : public TYdbCommand, public TCommandWithStreamName {
+ public:
+ TCommandStreamConsumerAdd();
+ void Config(TConfig &config) override;
+ void Parse(TConfig &config) override;
+ int Run(TConfig &config) override;
+
+ private:
+ TString ConsumerName_;
+ TMaybe<ui64> StartingMessageTimestamp_;
+ };
+
+ class TCommandStreamConsumerDrop : public TYdbCommand, public TCommandWithStreamName {
+ public:
+ TCommandStreamConsumerDrop();
+ void Config(TConfig &config) override;
+ void Parse(TConfig &config) override;
+ int Run(TConfig &config) override;
+
+ private:
+ TString ConsumerName_;
+ };
}// namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/common/command.cpp b/ydb/public/lib/ydb_cli/common/command.cpp
index ce7334d7388..5938c62edda 100644
--- a/ydb/public/lib/ydb_cli/common/command.cpp
+++ b/ydb/public/lib/ydb_cli/common/command.cpp
@@ -373,9 +373,9 @@ void TCommandWithPath::AdjustPath(const TClientCommand::TConfig& config) {
NConsoleClient::AdjustPath(Path, config);
}
-void TCommandWithStreamName::ParseStreamName(const TClientCommand::TConfig &config, const size_t argPos) {
- StreamName = config.ParseResult->GetFreeArgs()[argPos];
-}
-
+void TCommandWithStreamName::ParseStreamName(const TClientCommand::TConfig &config, const size_t argPos) {
+ StreamName = config.ParseResult->GetFreeArgs()[argPos];
}
+
}
+}
diff --git a/ydb/public/lib/ydb_cli/common/command.h b/ydb/public/lib/ydb_cli/common/command.h
index 441206cdf7b..7f1287cb913 100644
--- a/ydb/public/lib/ydb_cli/common/command.h
+++ b/ydb/public/lib/ydb_cli/common/command.h
@@ -355,12 +355,12 @@ protected:
TString Path;
};
-class TCommandWithStreamName {
-protected:
- void ParseStreamName(const TClientCommand::TConfig& config, const size_t argPos);
-
- TString StreamName;
-};
-
+class TCommandWithStreamName {
+protected:
+ void ParseStreamName(const TClientCommand::TConfig& config, const size_t argPos);
+
+ TString StreamName;
+};
+
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
index a22ed23cead..a9a22d46173 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
@@ -72,7 +72,7 @@ TString TCredentials::GetIamServiceAccountKey() const {
TDescribeTopicResult::TDescribeTopicResult(TStatus status, const Ydb::PersQueue::V1::DescribeTopicResult& result)
: TStatus(std::move(status))
, TopicSettings_(result.settings())
- , Proto_(result)
+ , Proto_(result)
{
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index ee9aa75a0ae..10744196011 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -15,10 +15,10 @@
#include <exception>
#include <variant>
-namespace NYdb {
- class TProtoAccessor;
-}
-
+namespace NYdb {
+ class TProtoAccessor;
+}
+
namespace NYdb::NPersQueue {
enum class EFormat {
@@ -57,7 +57,7 @@ private:
// Result for describe resource request.
struct TDescribeTopicResult : public TStatus {
- friend class NYdb::TProtoAccessor;
+ friend class NYdb::TProtoAccessor;
struct TTopicSettings {
TTopicSettings(const Ydb::PersQueue::V1::TopicSettings&);
@@ -158,10 +158,10 @@ struct TDescribeTopicResult : public TStatus {
private:
TTopicSettings TopicSettings_;
- [[nodiscard]] const Ydb::PersQueue::V1::DescribeTopicResult& GetProto() const {
- return Proto_;
- }
- const Ydb::PersQueue::V1::DescribeTopicResult Proto_;
+ [[nodiscard]] const Ydb::PersQueue::V1::DescribeTopicResult& GetProto() const {
+ return Proto_;
+ }
+ const Ydb::PersQueue::V1::DescribeTopicResult Proto_;
};
using TAsyncDescribeTopicResult = NThreading::TFuture<TDescribeTopicResult>;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp
index cc5ce969e77..23b4844727e 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/proto_accessor.cpp
@@ -1,8 +1,8 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
-
-namespace NYdb {
- const Ydb::PersQueue::V1::DescribeTopicResult& TProtoAccessor::GetProto(const NPersQueue::TDescribeTopicResult& topicDescription) {
- return topicDescription.GetProto();
- }
+
+namespace NYdb {
+ const Ydb::PersQueue::V1::DescribeTopicResult& TProtoAccessor::GetProto(const NPersQueue::TDescribeTopicResult& topicDescription) {
+ return topicDescription.GetProto();
+ }
}// namespace NYdb
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
index d19d4f6b2e6..0668c19b76c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
@@ -9,7 +9,7 @@ GENERATE_ENUM_SERIALIZATION(ydb/public/sdk/cpp/client/ydb_persqueue_core/persque
SRCS(
persqueue.h
- proto_accessor.cpp
+ proto_accessor.cpp
)
PEERDIR(
diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
index be0165d74bb..f99bc88c82e 100644
--- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
+++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
@@ -34,7 +34,7 @@ public:
static const Ydb::ResultSet& GetProto(const TResultSet& resultSet);
static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats);
static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription);
- static const Ydb::PersQueue::V1::DescribeTopicResult& GetProto(const NYdb::NPersQueue::TDescribeTopicResult& topicDescription);
+ static const Ydb::PersQueue::V1::DescribeTopicResult& GetProto(const NYdb::NPersQueue::TDescribeTopicResult& topicDescription);
static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats);
static NTable::TTableDescription FromProto(const Ydb::Table::CreateTableRequest& request);