aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-06-24 18:03:03 +0300
committeralexnick <alexnick@yandex-team.ru>2022-06-24 18:03:03 +0300
commitd75b40213877270640e39cc7d734a0a4c7902230 (patch)
tree854c2b17d76434e199a4e2de53c8c1975e1097f7
parent4ef1d84f3f2f798b9c95129e716afd50cdb46234 (diff)
downloadydb-d75b40213877270640e39cc7d734a0a4c7902230.tar.gz
ydb_topic SDK for control-plane
ref:19e0ac5211da860b6331c2f320c679533d60741c
-rw-r--r--CMakeLists.darwin.txt2
-rw-r--r--CMakeLists.linux.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_proto/accessor.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt27
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt25
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp172
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h199
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h437
-rw-r--r--ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h8
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp78
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt2
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux.txt2
14 files changed, 970 insertions, 5 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index f8b33fb898..2806d3da7a 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -1007,6 +1007,8 @@ add_subdirectory(ydb/services/cms/ut)
add_subdirectory(ydb/services/datastreams/ut)
add_subdirectory(ydb/services/persqueue_cluster_discovery/ut)
add_subdirectory(ydb/services/persqueue_v1/ut)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl)
add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut)
add_subdirectory(ydb/services/rate_limiter/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 8a32270867..d3d40e4aee 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -1103,6 +1103,8 @@ add_subdirectory(ydb/services/cms/ut)
add_subdirectory(ydb/services/datastreams/ut)
add_subdirectory(ydb/services/persqueue_cluster_discovery/ut)
add_subdirectory(ydb/services/persqueue_v1/ut)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl)
add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut)
add_subdirectory(ydb/services/rate_limiter/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp
index 66385d41a8..4c5eb570f5 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp
@@ -52,10 +52,18 @@ namespace NKikimr::NPersQueueTests {
std::optional<TString> codec,
std::optional<bool> reconnectOnFailure
) {
- Y_UNUSED(codec);
auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId);
if (partitionGroup) settings.PartitionGroupId(*partitionGroup);
settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? NYdb::NPersQueue::IRetryPolicy::GetDefaultPolicy() : NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy());
+ if (codec) {
+ if (*codec == "raw")
+ settings.Codec(ECodec::RAW);
+ if (*codec == "zstd")
+ settings.Codec(ECodec::ZSTD);
+ if (*codec == "lzop")
+ settings.Codec(ECodec::LZOP);
+ }
+ settings.MaxMemoryUsage(1024*1024*1024*1024ll);
return CreateSimpleWriter(driver, settings);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
index 88491acab6..2c43474639 100644
--- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
+++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
@@ -6,11 +6,13 @@
#include <ydb/public/api/protos/ydb_export.pb.h>
#include <ydb/public/api/protos/ydb_import.pb.h>
#include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h>
+#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_export/export.h>
#include <ydb/public/sdk/cpp/client/ydb_import/import.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
namespace NYdb {
@@ -35,6 +37,7 @@ public:
static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats);
static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription);
static const Ydb::PersQueue::V1::DescribeTopicResult& GetProto(const NYdb::NPersQueue::TDescribeTopicResult& topicDescription);
+ static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription);
static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats);
static NTable::TTableDescription FromProto(const Ydb::Table::CreateTableRequest& request);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt
new file mode 100644
index 0000000000..98ca02cc8d
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt
@@ -0,0 +1,27 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-client-ydb_topic)
+target_link_libraries(cpp-client-ydb_topic PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ tools-enum_parser-enum_serialization_runtime
+ library-cpp-retry
+ client-ydb_topic-impl
+ cpp-client-ydb_proto
+ cpp-client-ydb_driver
+)
+target_sources(cpp-client-ydb_topic PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
+)
+generate_enum_serilization(cpp-client-ydb_topic
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+ INCLUDE_HEADERS
+ ydb/public/sdk/cpp/client/ydb_topic/topic.h
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt
new file mode 100644
index 0000000000..24edb06472
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt
@@ -0,0 +1,25 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(client-ydb_topic-impl)
+target_link_libraries(client-ydb_topic-impl PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-client
+ cpp-monlib-dynamic_counters
+ cpp-string_utils-url
+ library-persqueue-obfuscate
+ api-grpc-draft
+ impl-ydb_internal-make_request
+ client-ydb_common_client-impl
+ cpp-client-ydb_driver
+)
+target_sources(client-ydb_topic-impl PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
new file mode 100644
index 0000000000..473d473878
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -0,0 +1,172 @@
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/table_helpers/helpers.h>
+
+#include <ydb/library/persqueue/obfuscate/obfuscate.h>
+
+#include <util/random/random.h>
+#include <util/string/cast.h>
+#include <util/string/subst.h>
+
+namespace NYdb::NTopic {
+
+TTopicClient::TTopicClient(const TDriver& driver, const TTopicClientSettings& settings)
+ : Impl_(std::make_shared<TImpl>(CreateInternalInterface(driver), settings))
+{
+}
+
+TDescribeTopicResult::TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result)
+ : TStatus(std::move(status))
+ , TopicDescription_(std::move(result))
+{
+}
+
+const TTopicDescription& TDescribeTopicResult::GetTopicDescription() const {
+ return TopicDescription_;
+}
+
+TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result)
+ : Proto_(std::move(result))
+ , PartitioningSettings_(Proto_.partitioning_settings())
+ , RetentionPeriod_(TDuration::Seconds(Proto_.retention_period().seconds()))
+ , RetentionStorageMb_(Proto_.retention_storage_mb() > 0 ? TMaybe<ui64>(Proto_.retention_storage_mb()) : Nothing())
+ , PartitionWriteSpeedBytesPerSecond_(Proto_.partition_write_speed_bytes_per_second())
+ , PartitionWriteBurstBytes_(Proto_.partition_write_burst_bytes())
+{
+ Owner_ = Proto_.self().owner();
+ PermissionToSchemeEntry(Proto_.self().permissions(), &Permissions_);
+ PermissionToSchemeEntry(Proto_.self().effective_permissions(), &EffectivePermissions_);
+
+ for (const auto& part : Proto_.partitions()) {
+ Partitions_.emplace_back(part);
+ }
+ for (const auto& codec : Proto_.supported_codecs().codecs()) {
+ SupportedCodecs_.push_back((ECodec)codec);
+ }
+ for (const auto& pair : Proto_.attributes()) {
+ Attributes_[pair.first] = pair.second;
+ }
+ for (const auto& consumer : Proto_.consumers()) {
+ Consumers_.emplace_back(consumer);
+ }
+}
+
+const TPartitioningSettings& TTopicDescription::GetPartitioningSettings() const {
+ return PartitioningSettings_;
+}
+
+ui32 TTopicDescription::GetTotalPartitionsCount() const {
+ return Partitions_.size();
+}
+
+const TVector<TPartitionInfo>& TTopicDescription::GetPartitions() const {
+ return Partitions_;
+}
+
+const TVector<ECodec>& TTopicDescription::GetSupportedCodecs() const {
+ return SupportedCodecs_;
+}
+
+const TDuration& TTopicDescription::GetRetentionPeriod() const {
+ return RetentionPeriod_;
+}
+
+TMaybe<ui64> TTopicDescription::GetRetentionStorageMb() const {
+ return RetentionStorageMb_;
+}
+
+ui64 TTopicDescription::GetPartitionWriteSpeedBytesPerSecond() const {
+ return PartitionWriteSpeedBytesPerSecond_;
+}
+
+ui64 TTopicDescription::GetPartitionWriteBurstBytes() const {
+ return PartitionWriteBurstBytes_;
+}
+
+const TMap<TString, TString>& TTopicDescription::GetAttributes() const {
+ return Attributes_;
+}
+
+const TVector<TConsumer>& TTopicDescription::GetConsumers() const {
+ return Consumers_;
+}
+
+void TTopicDescription::SerializeTo(Ydb::Topic::CreateTopicRequest& request) const {
+ Y_UNUSED(request);
+ Y_FAIL("Not implemented");
+}
+
+const Ydb::Topic::DescribeTopicResult& TTopicDescription::GetProto() const {
+ return Proto_;
+}
+
+const TString& TTopicDescription::GetOwner() const {
+ return Owner_;
+}
+
+const TVector<NScheme::TPermissions>& TTopicDescription::GetPermissions() const {
+ return Permissions_;
+}
+
+const TVector<NScheme::TPermissions>& TTopicDescription::GetEffectivePermissions() const {
+ return EffectivePermissions_;
+}
+
+TPartitioningSettings::TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings)
+ : MinActivePartitions_(settings.min_active_partitions())
+ , PartitionCountLimit_(settings.partition_count_limit())
+{}
+
+ui64 TPartitioningSettings::GetMinActivePartitions() const {
+ return MinActivePartitions_;
+}
+
+ui64 TPartitioningSettings::GetPartitionCountLimit() const {
+ return PartitionCountLimit_;
+}
+
+TConsumer::TConsumer(const Ydb::Topic::Consumer& consumer)
+ : ConsumerName_(consumer.name())
+ , Important_(consumer.important())
+ , ReadFrom_(TInstant::Seconds(consumer.read_from().seconds()))
+{
+ for (const auto& codec : consumer.supported_codecs().codecs()) {
+ SupportedCodecs_.push_back((ECodec)codec);
+ }
+ for (const auto& pair : consumer.attributes()) {
+ Attributes_[pair.first] = pair.second;
+ }
+}
+
+TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo)
+ : PartitionId_(partitionInfo.partition_id())
+ , Active_(partitionInfo.active())
+{
+ for (const auto& partId : partitionInfo.child_partition_ids()) {
+ ChildPartitionIds_.push_back(partId);
+ }
+
+ for (const auto& partId : partitionInfo.parent_partition_ids()) {
+ ParentPartitionIds_.push_back(partId);
+ }
+}
+
+
+TAsyncStatus TTopicClient::CreateTopic(const TString& path, const TCreateTopicSettings& settings) {
+ return Impl_->CreateTopic(path, settings);
+}
+
+
+TAsyncStatus TTopicClient::AlterTopic(const TString& path, const TAlterTopicSettings& settings) {
+ return Impl_->AlterTopic(path, settings);
+}
+
+TAsyncStatus TTopicClient::DropTopic(const TString& path, const TDropTopicSettings& settings) {
+ return Impl_->DropTopic(path, settings);
+}
+
+TAsyncDescribeTopicResult TTopicClient::DescribeTopic(const TString& path, const TDescribeTopicSettings& settings) {
+ return Impl_->DescribeTopic(path, settings);
+}
+
+} // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
new file mode 100644
index 0000000000..524ca596c3
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
@@ -0,0 +1,199 @@
+#pragma once
+
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+
+#include <ydb/public/api/grpc/draft/ydb_topic_v1.grpc.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+namespace NYdb::NTopic {
+
+class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
+public:
+ // Constructor for main client.
+ TImpl(std::shared_ptr<TGRpcConnectionsImpl> connections, const TTopicClientSettings& settings)
+ : TClientImplCommon(std::move(connections), settings)
+ , Settings(settings)
+ {
+ }
+
+ template <class TSettings>
+ static void ConvertConsumerToProto(const TConsumerSettings<TSettings>& settings, Ydb::Topic::Consumer& consumerProto) {
+ consumerProto.set_name(settings.ConsumerName_);
+ consumerProto.set_important(settings.Important_);
+ consumerProto.mutable_read_from()->set_seconds(settings.ReadFrom_.Seconds());
+
+ for (const auto& codec : settings.SupportedCodecs_) {
+ consumerProto.mutable_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec)));
+ }
+ for (auto& pair : settings.Attributes_) {
+ (*consumerProto.mutable_attributes())[pair.first] = pair.second;
+ }
+ }
+
+ static void ConvertAlterConsumerToProto(const TAlterConsumerSettings& settings, Ydb::Topic::AlterConsumer& consumerProto) {
+ consumerProto.set_name(settings.ConsumerName_);
+ if (settings.SetImportant_)
+ consumerProto.set_set_important(*settings.SetImportant_);
+ if (settings.SetReadFrom_)
+ consumerProto.mutable_set_read_from()->set_seconds(settings.SetReadFrom_->Seconds());
+
+ if (settings.SetSupportedCodecs_) {
+ for (const auto& codec : *settings.SetSupportedCodecs_) {
+ consumerProto.mutable_set_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec)));
+ }
+ }
+
+ for (auto& pair : settings.AlterAttributes_) {
+ (*consumerProto.mutable_alter_attributes())[pair.first] = pair.second;
+ }
+ }
+
+
+ static Ydb::Topic::CreateTopicRequest MakePropsCreateRequest(const TString& path, const TCreateTopicSettings& settings) {
+ Ydb::Topic::CreateTopicRequest request = MakeOperationRequest<Ydb::Topic::CreateTopicRequest>(settings);
+ request.set_path(path);
+
+ request.mutable_partitioning_settings()->set_min_active_partitions(settings.PartitioningSettings_.GetMinActivePartitions());
+ request.mutable_partitioning_settings()->set_partition_count_limit(settings.PartitioningSettings_.GetPartitionCountLimit());
+
+ request.mutable_retention_period()->set_seconds(settings.RetentionPeriod_.Seconds());
+
+ for (const auto& codec : settings.SupportedCodecs_) {
+ request.mutable_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec)));
+ }
+ request.set_partition_write_speed_bytes_per_second(settings.PartitionWriteSpeedBytesPerSecond_);
+ request.set_partition_write_burst_bytes(settings.PartitionWriteBurstBytes_);
+ request.set_retention_storage_mb(settings.RetentionStorageMb_);
+
+ for (auto& pair : settings.Attributes_) {
+ (*request.mutable_attributes())[pair.first] = pair.second;
+ }
+
+ for (const auto& consumer : settings.Consumers_) {
+ Ydb::Topic::Consumer& consumerProto = *request.add_consumers();
+ ConvertConsumerToProto(consumer, consumerProto);
+ }
+
+ return request;
+ }
+
+
+ TAsyncStatus CreateTopic(const TString& path, const TCreateTopicSettings& settings) {
+ auto request = MakePropsCreateRequest(path, settings);
+
+ return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse>(
+ std::move(request),
+ &Ydb::Topic::V1::TopicService::Stub::AsyncCreateTopic,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+ }
+
+
+ static Ydb::Topic::AlterTopicRequest MakePropsAlterRequest(const TString& path, const TAlterTopicSettings& settings) {
+ Ydb::Topic::AlterTopicRequest request = MakeOperationRequest<Ydb::Topic::AlterTopicRequest>(settings);
+ request.set_path(path);
+
+ if (settings.AlterPartitioningSettings_) {
+ request.mutable_alter_partitioning_settings()->set_set_min_active_partitions(settings.AlterPartitioningSettings_->GetMinActivePartitions());
+ request.mutable_alter_partitioning_settings()->set_set_partition_count_limit(settings.AlterPartitioningSettings_->GetPartitionCountLimit());
+ }
+ if (settings.SetRetentionPeriod_) {
+ request.mutable_set_retention_period()->set_seconds(settings.SetRetentionPeriod_->Seconds());
+ }
+ if (settings.SetSupportedCodecs_) {
+ for (const auto& codec : *settings.SetSupportedCodecs_) {
+ request.mutable_set_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec)));
+ }
+ }
+ if (settings.SetPartitionWriteSpeedBytesPerSecond_) {
+ request.set_set_partition_write_speed_bytes_per_second(*settings.SetPartitionWriteSpeedBytesPerSecond_);
+ }
+ if (settings.SetPartitionWriteBurstBytes_) {
+ request.set_set_partition_write_burst_bytes(*settings.SetPartitionWriteBurstBytes_);
+ }
+ if (settings.SetRetentionStorageMb_) {
+ request.set_set_retention_storage_mb(*settings.SetRetentionStorageMb_);
+ }
+
+ for (auto& pair : settings.AlterAttributes_) {
+ (*request.mutable_alter_attributes())[pair.first] = pair.second;
+ }
+
+ for (const auto& consumer : settings.AddConsumers_) {
+ Ydb::Topic::Consumer& consumerProto = *request.add_add_consumers();
+ ConvertConsumerToProto(consumer, consumerProto);
+ }
+
+ for (const auto& consumer : settings.DropConsumers_) {
+ request.add_drop_consumers(consumer);
+ }
+
+ for (const auto& consumer : settings.AlterConsumers_) {
+ Ydb::Topic::AlterConsumer& consumerProto = *request.add_alter_consumers();
+ ConvertAlterConsumerToProto(consumer, consumerProto);
+ }
+
+ return request;
+ }
+
+
+ TAsyncStatus AlterTopic(const TString& path, const TAlterTopicSettings& settings) {
+ auto request = MakePropsAlterRequest(path, settings);
+
+ return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse>(
+ std::move(request),
+ &Ydb::Topic::V1::TopicService::Stub::AsyncAlterTopic,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+ }
+
+
+ TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Topic::DropTopicRequest>(settings);
+ request.set_path(path);
+
+ return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse>(
+ std::move(request),
+ &Ydb::Topic::V1::TopicService::Stub::AsyncDropTopic,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+ }
+
+ TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Topic::DescribeTopicRequest>(settings);
+ request.set_path(path);
+
+ auto promise = NThreading::NewPromise<TDescribeTopicResult>();
+
+ auto extractor = [promise]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ Ydb::Topic::DescribeTopicResult result;
+ if (any) {
+ any->UnpackTo(&result);
+ }
+
+ TDescribeTopicResult val(TStatus(std::move(status)), std::move(result));
+ promise.SetValue(std::move(val));
+ };
+
+ Connections_->RunDeferred<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Topic::V1::TopicService::Stub::AsyncDescribeTopic,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return promise.GetFuture();
+ }
+
+private:
+ const TTopicClientSettings Settings;
+};
+
+} // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
new file mode 100644
index 0000000000..b72ef3c262
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
@@ -0,0 +1,8 @@
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+
+namespace NYdb {
+ const Ydb::Topic::DescribeTopicResult& TProtoAccessor::GetProto(const NTopic::TTopicDescription& topicDescription) {
+ return topicDescription.GetProto();
+ }
+}// namespace NYdb
+
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
new file mode 100644
index 0000000000..10c84c6f14
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -0,0 +1,437 @@
+#pragma once
+#include <ydb/public/api/grpc/draft/ydb_topic_v1.grpc.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/logger/log.h>
+#include <library/cpp/retry/retry_policy.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/size_literals.h>
+#include <util/string/builder.h>
+#include <util/thread/pool.h>
+
+#include <exception>
+#include <variant>
+
+namespace NYdb {
+ class TProtoAccessor;
+
+ namespace NScheme {
+ struct TPermissions;
+ }
+}
+
+namespace NYdb::NTopic {
+
+enum class ECodec : ui32 {
+ RAW = 1,
+ GZIP = 2,
+ LZOP = 3,
+ ZSTD = 4,
+ CUSTOM = 10000,
+};
+
+
+class TConsumer {
+public:
+ TConsumer(const Ydb::Topic::Consumer&);
+
+ const TString& GetConsumerName() const;
+ bool GetImportant() const;
+ const TInstant& GetReadFrom() const;
+ const TVector<ECodec>& GetSupportedCodecs() const;
+ const TMap<TString, TString>& GetAttributes() const;
+private:
+ TString ConsumerName_;
+ bool Important_;
+ TInstant ReadFrom_;
+ TMap<TString, TString> Attributes_;
+ TVector<ECodec> SupportedCodecs_;
+};
+
+class TPartitionInfo {
+public:
+ TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo);
+ ui64 GetPartitionId() const;
+ bool GetActive() const;
+ const TVector<ui64> GetChildPartitionIds() const;
+ const TVector<ui64> GetParentPartitionIds() const;
+
+private:
+ ui64 PartitionId_;
+ bool Active_;
+ TVector<ui64> ChildPartitionIds_;
+ TVector<ui64> ParentPartitionIds_;
+};
+
+
+class TPartitioningSettings {
+public:
+ TPartitioningSettings() : MinActivePartitions_(0), PartitionCountLimit_(0){}
+ TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings);
+ TPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) : MinActivePartitions_(minActivePartitions), PartitionCountLimit_(partitionCountLimit) {}
+
+ ui64 GetMinActivePartitions() const;
+ ui64 GetPartitionCountLimit() const;
+private:
+ ui64 MinActivePartitions_;
+ ui64 PartitionCountLimit_;
+};
+
+class TTopicDescription {
+ friend class NYdb::TProtoAccessor;
+
+public:
+ TTopicDescription(Ydb::Topic::DescribeTopicResult&& desc);
+
+ const TString& GetOwner() const;
+
+ const TVector<NScheme::TPermissions>& GetPermissions() const;
+
+ const TVector<NScheme::TPermissions>& GetEffectivePermissions() const;
+
+ const TPartitioningSettings& GetPartitioningSettings() const;
+
+ ui32 GetTotalPartitionsCount() const;
+
+ const TVector<TPartitionInfo>& GetPartitions() const;
+
+ const TVector<ECodec>& GetSupportedCodecs() const;
+
+ const TDuration& GetRetentionPeriod() const;
+
+ TMaybe<ui64> GetRetentionStorageMb() const;
+
+ ui64 GetPartitionWriteSpeedBytesPerSecond() const;
+
+ ui64 GetPartitionWriteBurstBytes() const;
+
+ const TMap<TString, TString>& GetAttributes() const;
+
+ const TVector<TConsumer>& GetConsumers() const;
+
+ void SerializeTo(Ydb::Topic::CreateTopicRequest& request) const;
+private:
+
+ const Ydb::Topic::DescribeTopicResult& GetProto() const;
+
+ const Ydb::Topic::DescribeTopicResult Proto_;
+ TVector<TPartitionInfo> Partitions_;
+ TVector<ECodec> SupportedCodecs_;
+ TPartitioningSettings PartitioningSettings_;
+ TDuration RetentionPeriod_;
+ TMaybe<ui64> RetentionStorageMb_;
+ ui64 PartitionWriteSpeedBytesPerSecond_;
+ ui64 PartitionWriteBurstBytes_;
+ TMap<TString, TString> Attributes_;
+ TVector<TConsumer> Consumers_;
+
+ TString Owner_;
+ TVector<NScheme::TPermissions> Permissions_;
+ TVector<NScheme::TPermissions> EffectivePermissions_;
+};
+
+
+// Result for describe resource request.
+struct TDescribeTopicResult : public TStatus {
+ friend class NYdb::TProtoAccessor;
+
+
+ TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result);
+
+ const TTopicDescription& GetTopicDescription() const;
+
+private:
+ TTopicDescription TopicDescription_;
+};
+
+using TAsyncDescribeTopicResult = NThreading::TFuture<TDescribeTopicResult>;
+
+template <class TSettings>
+class TAlterAttributesBuilderImpl {
+public:
+ TAlterAttributesBuilderImpl(TSettings& parent)
+ : Parent_(parent)
+ { }
+
+ TAlterAttributesBuilderImpl& Alter(const TString& key, const TString& value) {
+ AlterAttributes_[key] = value;
+ return *this;
+ }
+
+ TAlterAttributesBuilderImpl& Add(const TString& key, const TString& value) {
+ return Alter(key, value);
+ }
+
+ TAlterAttributesBuilderImpl& Drop(const TString& key) {
+ return Alter(key, "");
+ }
+
+ TSettings& EndAlterAttributes() { return Parent_; }
+
+private:
+ TSettings& Parent_;
+ THashMap<TString, TString> AlterAttributes_;
+};
+
+
+struct TAlterConsumerSettings;
+struct TAlterTopicSettings;
+struct TCreateTopicSettings;
+
+typedef TAlterAttributesBuilderImpl<TAlterConsumerSettings> TAlterConsumerAttributesBuilder;
+
+typedef TAlterAttributesBuilderImpl<TAlterTopicSettings> TAlterTopicAttributesBuilder;
+
+template<class TSettings>
+struct TConsumerSettings {
+ using TSelf = TConsumerSettings;
+
+ using TAttributes = TMap<TString, TString>;
+
+ TConsumerSettings(TSettings& parent): Parent_(parent) {}
+ TConsumerSettings(TSettings& parent, const TString& name) : ConsumerName_(name), Parent_(parent) {}
+
+ FLUENT_SETTING(TString, ConsumerName);
+ FLUENT_SETTING_DEFAULT(bool, Important, false);
+ FLUENT_SETTING_DEFAULT(TInstant, ReadFrom, TInstant::Zero());
+
+ FLUENT_SETTING_VECTOR(ECodec, SupportedCodecs);
+
+ FLUENT_SETTING(TAttributes, Attributes);
+
+ TConsumerSettings& AddAttribute(const TString& key, const TString& value) {
+ Attributes_[key] = value;
+ return *this;
+ }
+
+ TConsumerSettings& SetAttributes(TMap<TString, TString>&& attributes) {
+ Attributes_ = std::move(attributes);
+ return *this;
+ }
+
+ TConsumerSettings& SetAttributes(const TMap<TString, TString>& attributes) {
+ Attributes_ = attributes;
+ return *this;
+ }
+
+ TConsumerSettings& SetSuportedCodecs(TVector<ECodec>&& codecs) {
+ SupportedCodecs_ = std::move(codecs);
+ return *this;
+ }
+
+ TConsumerSettings& SetSuportedCodecs(const TVector<ECodec>& codecs) {
+ SupportedCodecs_ = codecs;
+ return *this;
+ }
+
+ TSettings& EndAddConsumer() { return Parent_; };
+
+private:
+ TSettings& Parent_;
+};
+
+
+struct TAlterConsumerSettings {
+ using TSelf = TAlterConsumerSettings;
+
+ using TAlterAttributes = TMap<TString, TString>;
+
+ TAlterConsumerSettings(TAlterTopicSettings& parent): Parent_(parent) {}
+ TAlterConsumerSettings(TAlterTopicSettings& parent, const TString& name) : ConsumerName_(name), Parent_(parent) {}
+
+ FLUENT_SETTING(TString, ConsumerName);
+ FLUENT_SETTING_OPTIONAL(bool, SetImportant);
+ FLUENT_SETTING_OPTIONAL(TInstant, SetReadFrom);
+
+ FLUENT_SETTING_OPTIONAL_VECTOR(ECodec, SetSupportedCodecs);
+
+ FLUENT_SETTING(TAlterAttributes, AlterAttributes);
+
+ TAlterConsumerAttributesBuilder BeginAlterAttributes() {
+ return TAlterConsumerAttributesBuilder(*this);
+ }
+
+ TAlterConsumerSettings& SetSuportedCodecs(TVector<ECodec>&& codecs) {
+ SetSupportedCodecs_ = std::move(codecs);
+ return *this;
+ }
+
+ TAlterConsumerSettings& SetSuportedCodecs(const TVector<ECodec>& codecs) {
+ SetSupportedCodecs_ = codecs;
+ return *this;
+ }
+
+ TAlterTopicSettings& EndAlterConsumer() { return Parent_; };
+
+private:
+ TAlterTopicSettings& Parent_;
+};
+
+
+struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSettings> {
+
+ using TSelf = TCreateTopicSettings;
+ using TAttributes = TMap<TString, TString>;
+
+ FLUENT_SETTING(TPartitioningSettings, PartitioningSettings);
+
+ FLUENT_SETTING_DEFAULT(TDuration, RetentionPeriod, TDuration::Hours(24));
+
+ FLUENT_SETTING_VECTOR(ECodec, SupportedCodecs);
+
+ FLUENT_SETTING_DEFAULT(ui64, RetentionStorageMb, 0);
+
+ FLUENT_SETTING_DEFAULT(ui64, PartitionWriteSpeedBytesPerSecond, 0);
+ FLUENT_SETTING_DEFAULT(ui64, PartitionWriteBurstBytes, 0);
+
+ FLUENT_SETTING_VECTOR(TConsumerSettings<TCreateTopicSettings>, Consumers);
+
+ FLUENT_SETTING(TAttributes, Attributes);
+
+
+ TCreateTopicSettings& SetSuportedCodecs(TVector<ECodec>&& codecs) {
+ SupportedCodecs_ = std::move(codecs);
+ return *this;
+ }
+
+ TCreateTopicSettings& SetSuportedCodecs(const TVector<ECodec>& codecs) {
+ SupportedCodecs_ = codecs;
+ return *this;
+ }
+
+ TConsumerSettings<TCreateTopicSettings>& BeginAddConsumer() {
+ Consumers_.push_back({*this});
+ return Consumers_.back();
+ }
+
+ TConsumerSettings<TCreateTopicSettings>& BeginAddConsumer(const TString& name) {
+ Consumers_.push_back({*this, name});
+ return Consumers_.back();
+ }
+
+ TCreateTopicSettings& AddAttribute(const TString& key, const TString& value) {
+ Attributes_[key] = value;
+ return *this;
+ }
+
+ TCreateTopicSettings& SetAttributes(TMap<TString, TString>&& attributes) {
+ Attributes_ = std::move(attributes);
+ return *this;
+ }
+
+ TCreateTopicSettings& SetAttributes(const TMap<TString, TString>& attributes) {
+ Attributes_ = attributes;
+ return *this;
+ }
+
+ TCreateTopicSettings& PartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) {
+ PartitioningSettings_ = TPartitioningSettings(minActivePartitions, partitionCountLimit);
+ return *this;
+ }
+};
+
+
+struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSettings> {
+
+ using TSelf = TAlterTopicSettings;
+ using TAlterAttributes = TMap<TString, TString>;
+
+ FLUENT_SETTING_OPTIONAL(TPartitioningSettings, AlterPartitioningSettings);
+
+ FLUENT_SETTING_OPTIONAL(TDuration, SetRetentionPeriod);
+
+ FLUENT_SETTING_OPTIONAL_VECTOR(ECodec, SetSupportedCodecs);
+
+ FLUENT_SETTING_OPTIONAL(ui64, SetRetentionStorageMb);
+
+ FLUENT_SETTING_OPTIONAL(ui64, SetPartitionWriteSpeedBytesPerSecond);
+ FLUENT_SETTING_OPTIONAL(ui64, SetPartitionWriteBurstBytes);
+
+ FLUENT_SETTING_VECTOR(TConsumerSettings<TAlterTopicSettings>, AddConsumers);
+ FLUENT_SETTING_VECTOR(TString, DropConsumers);
+ FLUENT_SETTING_VECTOR(TAlterConsumerSettings, AlterConsumers);
+
+ FLUENT_SETTING(TAlterAttributes, AlterAttributes);
+
+ TAlterTopicAttributesBuilder BeginAlterAttributes() {
+ return TAlterTopicAttributesBuilder(*this);
+ }
+
+ TAlterTopicSettings& SetSuportedCodecs(TVector<ECodec>&& codecs) {
+ SetSupportedCodecs_ = std::move(codecs);
+ return *this;
+ }
+
+ TAlterTopicSettings& SetSuportedCodecs(const TVector<ECodec>& codecs) {
+ SetSupportedCodecs_ = codecs;
+ return *this;
+ }
+
+ TConsumerSettings<TAlterTopicSettings>& BeginAddConsumer() {
+ AddConsumers_.push_back({*this});
+ return AddConsumers_.back();
+ }
+
+ TConsumerSettings<TAlterTopicSettings>& BeginAddConsumer(const TString& name) {
+ AddConsumers_.push_back({*this, name});
+ return AddConsumers_.back();
+ }
+
+ TAlterConsumerSettings& BeginAlterConsumer() {
+ AlterConsumers_.push_back({*this});
+ return AlterConsumers_.back();
+ }
+
+ TAlterConsumerSettings& BeginAlterConsumer(const TString& name) {
+ AlterConsumers_.push_back({*this, name});
+ return AlterConsumers_.back();
+ }
+
+ TAlterTopicSettings& AlterPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) {
+ AlterPartitioningSettings_ = TPartitioningSettings(minActivePartitions, partitionCountLimit);
+ return *this;
+ }
+};
+
+
+// Settings for drop resource request.
+struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings> {};
+
+// Settings for describe resource request.
+struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> {};
+
+
+struct TTopicClientSettings : public TCommonClientSettingsBase<TTopicClientSettings> {
+ using TSelf = TTopicClientSettings;
+
+};
+
+// Topic client.
+class TTopicClient {
+public:
+ class TImpl;
+
+ TTopicClient(const TDriver& driver, const TTopicClientSettings& settings = TTopicClientSettings());
+
+ // Create a new topic.
+ TAsyncStatus CreateTopic(const TString& path, const TCreateTopicSettings& settings = {});
+
+ // Update a topic.
+ TAsyncStatus AlterTopic(const TString& path, const TAlterTopicSettings& = {});
+
+ // Delete a topic.
+ TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& = {});
+
+ // Describe settings of topic.
+ TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& = {});
+
+private:
+ std::shared_ptr<TImpl> Impl_;
+};
+
+} // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h b/ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h
index 8747e049f5..cda28a75cf 100644
--- a/ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h
+++ b/ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h
@@ -42,3 +42,11 @@
name##_.push_back(value); \
return static_cast<TSelf&>(*this); \
}
+
+#define FLUENT_SETTING_OPTIONAL_VECTOR(type, name) \
+ TMaybe<TVector<type>> name##_; \
+ TSelf& Append##name(const type& value) { \
+ if (!name##_) name##_ = TVector<type>{}; \
+ name##_->push_back(value); \
+ return static_cast<TSelf&>(*this); \
+ }
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index f29ee73e16..05c4064d13 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -39,6 +39,7 @@
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
namespace NKikimr::NPersQueueTests {
@@ -3213,11 +3214,80 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
auto status = TopicStubP_->DescribeTopic(&rcontext, request, &response);
UNIT_ASSERT(status.ok());
- Ydb::Topic::DescribeTopicResult res;
- response.operation().result().UnpackTo(&res);
- Cerr << response << "\n" << res << "\n";
+ Ydb::Topic::DescribeTopicResult descrRes;
+ response.operation().result().UnpackTo(&descrRes);
+ Cerr << response << "\n" << descrRes << "\n";
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(res.DebugString(), res1.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res1.DebugString());
+
+
+ {
+ NYdb::TDriverConfig driverCfg;
+ driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort);
+ std::shared_ptr<NYdb::TDriver> ydbDriver(new NYdb::TDriver(driverCfg));
+ auto topicClient = NYdb::NTopic::TTopicClient(*ydbDriver);
+
+ auto res = topicClient.DescribeTopic("/Root/PQ/" + topic3);
+ res.Wait();
+ Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
+ UNIT_ASSERT(res.GetValue().IsSuccess());
+ auto res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetTopicDescription());
+ Cerr << res2 << "\n";
+ UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res2.DebugString());
+ {
+ NYdb::NTopic::TCreateTopicSettings settings;
+ settings.PartitioningSettings(1,1)
+ .AppendSupportedCodecs((NYdb::NTopic::ECodec)10010)
+ .PartitionWriteSpeedBytesPerSecond(1024)
+ .AppendSupportedCodecs(NYdb::NTopic::ECodec::GZIP)
+ .AddAttribute("_partitions_per_tablet", "10")
+ .BeginAddConsumer("consumer").ReadFrom(TInstant::Seconds(112233))
+ .Important(true)
+ .AddAttribute("_version", "5")
+ .EndAddConsumer()
+ .AppendSupportedCodecs((NYdb::NTopic::ECodec)10011);
+
+ auto res = topicClient.CreateTopic("/Root/PQ/" + topic3 + "2", settings);
+ res.Wait();
+ Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
+ UNIT_ASSERT(res.GetValue().IsSuccess());
+ }
+
+ {
+ NYdb::NTopic::TAlterTopicSettings settings;
+ settings.AlterPartitioningSettings(2,2)
+ .AppendSetSupportedCodecs((NYdb::NTopic::ECodec)10022)
+ .SetPartitionWriteSpeedBytesPerSecond(102400)
+ .SetRetentionPeriod(TDuration::Days(2))
+ .BeginAlterAttributes().Add("_partitions_per_tablet", "")
+ .Drop("_abc_id")
+ .EndAlterAttributes()
+ .BeginAlterConsumer("consumer").SetReadFrom(TInstant::Seconds(1122))
+ .BeginAlterAttributes().Alter("_version", "5")
+ .EndAlterAttributes()
+ .EndAlterConsumer()
+ .AppendSetSupportedCodecs((NYdb::NTopic::ECodec)10020);
+
+ auto res = topicClient.AlterTopic("/Root/PQ/" + topic3 + "2", settings);
+ res.Wait();
+ Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
+ UNIT_ASSERT(res.GetValue().IsSuccess());
+ }
+
+ res = topicClient.DescribeTopic("/Root/PQ/" + topic3 + "2");
+ res.Wait();
+ Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
+ UNIT_ASSERT(res.GetValue().IsSuccess());
+ res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetTopicDescription());
+ Cerr << "ANOTHER TOPIC: " << res2 << "\n";
+ auto& description = res.GetValue().GetTopicDescription();
+ UNIT_ASSERT_VALUES_EQUAL(description.GetTotalPartitionsCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), 1);
+ TVector<NYdb::NTopic::ECodec> codecs = {(NYdb::NTopic::ECodec)10022, (NYdb::NTopic::ECodec)10020};
+ UNIT_ASSERT_VALUES_EQUAL(description.GetSupportedCodecs(), codecs);
+ UNIT_ASSERT_VALUES_EQUAL(description.GetEffectivePermissions().size(), 0);
+ }
+
}
{
Ydb::Topic::DropTopicRequest request;
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
index ea0f1efcb2..5f8c3c4d4c 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
@@ -30,6 +30,8 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC
ydb_persqueue_core-ut-ut_utils
cpp-client-ydb_persqueue_public
cpp-client-ydb_table
+ cpp-client-ydb_topic
+ cpp-client-ydb_proto
)
target_link_options(ydb-services-persqueue_v1-ut PRIVATE
-Wl,-no_deduplicate
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
index f8fe0dd9bd..c082f4e70d 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
@@ -32,6 +32,8 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC
ydb_persqueue_core-ut-ut_utils
cpp-client-ydb_persqueue_public
cpp-client-ydb_table
+ cpp-client-ydb_topic
+ cpp-client-ydb_proto
)
target_link_options(ydb-services-persqueue_v1-ut PRIVATE
-ldl