diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-06-24 18:03:03 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-06-24 18:03:03 +0300 |
commit | d75b40213877270640e39cc7d734a0a4c7902230 (patch) | |
tree | 854c2b17d76434e199a4e2de53c8c1975e1097f7 | |
parent | 4ef1d84f3f2f798b9c95129e716afd50cdb46234 (diff) | |
download | ydb-d75b40213877270640e39cc7d734a0a4c7902230.tar.gz |
ydb_topic SDK for control-plane
ref:19e0ac5211da860b6331c2f320c679533d60741c
-rw-r--r-- | CMakeLists.darwin.txt | 2 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp | 10 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_proto/accessor.h | 3 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt | 27 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt | 25 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp | 172 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h | 199 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp | 8 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/topic.h | 437 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h | 8 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 78 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/CMakeLists.linux.txt | 2 |
14 files changed, 970 insertions, 5 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index f8b33fb898..2806d3da7a 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -1007,6 +1007,8 @@ add_subdirectory(ydb/services/cms/ut) add_subdirectory(ydb/services/datastreams/ut) add_subdirectory(ydb/services/persqueue_cluster_discovery/ut) add_subdirectory(ydb/services/persqueue_v1/ut) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl) add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut) add_subdirectory(ydb/services/rate_limiter/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 8a32270867..d3d40e4aee 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -1103,6 +1103,8 @@ add_subdirectory(ydb/services/cms/ut) add_subdirectory(ydb/services/datastreams/ut) add_subdirectory(ydb/services/persqueue_cluster_discovery/ut) add_subdirectory(ydb/services/persqueue_v1/ut) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl) add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut) add_subdirectory(ydb/services/rate_limiter/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination) diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp index 66385d41a8..4c5eb570f5 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp @@ -52,10 +52,18 @@ namespace NKikimr::NPersQueueTests { std::optional<TString> codec, std::optional<bool> reconnectOnFailure ) { - Y_UNUSED(codec); auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId); if (partitionGroup) settings.PartitionGroupId(*partitionGroup); settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? NYdb::NPersQueue::IRetryPolicy::GetDefaultPolicy() : NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()); + if (codec) { + if (*codec == "raw") + settings.Codec(ECodec::RAW); + if (*codec == "zstd") + settings.Codec(ECodec::ZSTD); + if (*codec == "lzop") + settings.Codec(ECodec::LZOP); + } + settings.MaxMemoryUsage(1024*1024*1024*1024ll); return CreateSimpleWriter(driver, settings); } diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h index 88491acab6..2c43474639 100644 --- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h +++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h @@ -6,11 +6,13 @@ #include <ydb/public/api/protos/ydb_export.pb.h> #include <ydb/public/api/protos/ydb_import.pb.h> #include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h> +#include <ydb/public/api/protos/ydb_topic.pb.h> #include <ydb/public/sdk/cpp/client/ydb_export/export.h> #include <ydb/public/sdk/cpp/client/ydb_import/import.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> namespace NYdb { @@ -35,6 +37,7 @@ public: static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats); static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription); static const Ydb::PersQueue::V1::DescribeTopicResult& GetProto(const NYdb::NPersQueue::TDescribeTopicResult& topicDescription); + static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription); static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats); static NTable::TTableDescription FromProto(const Ydb::Table::CreateTableRequest& request); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt new file mode 100644 index 0000000000..98ca02cc8d --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.txt @@ -0,0 +1,27 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-client-ydb_topic) +target_link_libraries(cpp-client-ydb_topic PUBLIC + contrib-libs-cxxsupp + yutil + tools-enum_parser-enum_serialization_runtime + library-cpp-retry + client-ydb_topic-impl + cpp-client-ydb_proto + cpp-client-ydb_driver +) +target_sources(cpp-client-ydb_topic PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp +) +generate_enum_serilization(cpp-client-ydb_topic + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/topic.h + INCLUDE_HEADERS + ydb/public/sdk/cpp/client/ydb_topic/topic.h +) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt new file mode 100644 index 0000000000..24edb06472 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt @@ -0,0 +1,25 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(client-ydb_topic-impl) +target_link_libraries(client-ydb_topic-impl PUBLIC + contrib-libs-cxxsupp + yutil + cpp-grpc-client + cpp-monlib-dynamic_counters + cpp-string_utils-url + library-persqueue-obfuscate + api-grpc-draft + impl-ydb_internal-make_request + client-ydb_common_client-impl + cpp-client-ydb_driver +) +target_sources(client-ydb_topic-impl PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp new file mode 100644 index 0000000000..473d473878 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -0,0 +1,172 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/table_helpers/helpers.h> + +#include <ydb/library/persqueue/obfuscate/obfuscate.h> + +#include <util/random/random.h> +#include <util/string/cast.h> +#include <util/string/subst.h> + +namespace NYdb::NTopic { + +TTopicClient::TTopicClient(const TDriver& driver, const TTopicClientSettings& settings) + : Impl_(std::make_shared<TImpl>(CreateInternalInterface(driver), settings)) +{ +} + +TDescribeTopicResult::TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result) + : TStatus(std::move(status)) + , TopicDescription_(std::move(result)) +{ +} + +const TTopicDescription& TDescribeTopicResult::GetTopicDescription() const { + return TopicDescription_; +} + +TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result) + : Proto_(std::move(result)) + , PartitioningSettings_(Proto_.partitioning_settings()) + , RetentionPeriod_(TDuration::Seconds(Proto_.retention_period().seconds())) + , RetentionStorageMb_(Proto_.retention_storage_mb() > 0 ? TMaybe<ui64>(Proto_.retention_storage_mb()) : Nothing()) + , PartitionWriteSpeedBytesPerSecond_(Proto_.partition_write_speed_bytes_per_second()) + , PartitionWriteBurstBytes_(Proto_.partition_write_burst_bytes()) +{ + Owner_ = Proto_.self().owner(); + PermissionToSchemeEntry(Proto_.self().permissions(), &Permissions_); + PermissionToSchemeEntry(Proto_.self().effective_permissions(), &EffectivePermissions_); + + for (const auto& part : Proto_.partitions()) { + Partitions_.emplace_back(part); + } + for (const auto& codec : Proto_.supported_codecs().codecs()) { + SupportedCodecs_.push_back((ECodec)codec); + } + for (const auto& pair : Proto_.attributes()) { + Attributes_[pair.first] = pair.second; + } + for (const auto& consumer : Proto_.consumers()) { + Consumers_.emplace_back(consumer); + } +} + +const TPartitioningSettings& TTopicDescription::GetPartitioningSettings() const { + return PartitioningSettings_; +} + +ui32 TTopicDescription::GetTotalPartitionsCount() const { + return Partitions_.size(); +} + +const TVector<TPartitionInfo>& TTopicDescription::GetPartitions() const { + return Partitions_; +} + +const TVector<ECodec>& TTopicDescription::GetSupportedCodecs() const { + return SupportedCodecs_; +} + +const TDuration& TTopicDescription::GetRetentionPeriod() const { + return RetentionPeriod_; +} + +TMaybe<ui64> TTopicDescription::GetRetentionStorageMb() const { + return RetentionStorageMb_; +} + +ui64 TTopicDescription::GetPartitionWriteSpeedBytesPerSecond() const { + return PartitionWriteSpeedBytesPerSecond_; +} + +ui64 TTopicDescription::GetPartitionWriteBurstBytes() const { + return PartitionWriteBurstBytes_; +} + +const TMap<TString, TString>& TTopicDescription::GetAttributes() const { + return Attributes_; +} + +const TVector<TConsumer>& TTopicDescription::GetConsumers() const { + return Consumers_; +} + +void TTopicDescription::SerializeTo(Ydb::Topic::CreateTopicRequest& request) const { + Y_UNUSED(request); + Y_FAIL("Not implemented"); +} + +const Ydb::Topic::DescribeTopicResult& TTopicDescription::GetProto() const { + return Proto_; +} + +const TString& TTopicDescription::GetOwner() const { + return Owner_; +} + +const TVector<NScheme::TPermissions>& TTopicDescription::GetPermissions() const { + return Permissions_; +} + +const TVector<NScheme::TPermissions>& TTopicDescription::GetEffectivePermissions() const { + return EffectivePermissions_; +} + +TPartitioningSettings::TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings) + : MinActivePartitions_(settings.min_active_partitions()) + , PartitionCountLimit_(settings.partition_count_limit()) +{} + +ui64 TPartitioningSettings::GetMinActivePartitions() const { + return MinActivePartitions_; +} + +ui64 TPartitioningSettings::GetPartitionCountLimit() const { + return PartitionCountLimit_; +} + +TConsumer::TConsumer(const Ydb::Topic::Consumer& consumer) + : ConsumerName_(consumer.name()) + , Important_(consumer.important()) + , ReadFrom_(TInstant::Seconds(consumer.read_from().seconds())) +{ + for (const auto& codec : consumer.supported_codecs().codecs()) { + SupportedCodecs_.push_back((ECodec)codec); + } + for (const auto& pair : consumer.attributes()) { + Attributes_[pair.first] = pair.second; + } +} + +TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo) + : PartitionId_(partitionInfo.partition_id()) + , Active_(partitionInfo.active()) +{ + for (const auto& partId : partitionInfo.child_partition_ids()) { + ChildPartitionIds_.push_back(partId); + } + + for (const auto& partId : partitionInfo.parent_partition_ids()) { + ParentPartitionIds_.push_back(partId); + } +} + + +TAsyncStatus TTopicClient::CreateTopic(const TString& path, const TCreateTopicSettings& settings) { + return Impl_->CreateTopic(path, settings); +} + + +TAsyncStatus TTopicClient::AlterTopic(const TString& path, const TAlterTopicSettings& settings) { + return Impl_->AlterTopic(path, settings); +} + +TAsyncStatus TTopicClient::DropTopic(const TString& path, const TDropTopicSettings& settings) { + return Impl_->DropTopic(path, settings); +} + +TAsyncDescribeTopicResult TTopicClient::DescribeTopic(const TString& path, const TDescribeTopicSettings& settings) { + return Impl_->DescribeTopic(path, settings); +} + +} // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h new file mode 100644 index 0000000000..524ca596c3 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h @@ -0,0 +1,199 @@ +#pragma once + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> + +#include <ydb/public/api/grpc/draft/ydb_topic_v1.grpc.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +namespace NYdb::NTopic { + +class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> { +public: + // Constructor for main client. + TImpl(std::shared_ptr<TGRpcConnectionsImpl> connections, const TTopicClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + , Settings(settings) + { + } + + template <class TSettings> + static void ConvertConsumerToProto(const TConsumerSettings<TSettings>& settings, Ydb::Topic::Consumer& consumerProto) { + consumerProto.set_name(settings.ConsumerName_); + consumerProto.set_important(settings.Important_); + consumerProto.mutable_read_from()->set_seconds(settings.ReadFrom_.Seconds()); + + for (const auto& codec : settings.SupportedCodecs_) { + consumerProto.mutable_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec))); + } + for (auto& pair : settings.Attributes_) { + (*consumerProto.mutable_attributes())[pair.first] = pair.second; + } + } + + static void ConvertAlterConsumerToProto(const TAlterConsumerSettings& settings, Ydb::Topic::AlterConsumer& consumerProto) { + consumerProto.set_name(settings.ConsumerName_); + if (settings.SetImportant_) + consumerProto.set_set_important(*settings.SetImportant_); + if (settings.SetReadFrom_) + consumerProto.mutable_set_read_from()->set_seconds(settings.SetReadFrom_->Seconds()); + + if (settings.SetSupportedCodecs_) { + for (const auto& codec : *settings.SetSupportedCodecs_) { + consumerProto.mutable_set_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec))); + } + } + + for (auto& pair : settings.AlterAttributes_) { + (*consumerProto.mutable_alter_attributes())[pair.first] = pair.second; + } + } + + + static Ydb::Topic::CreateTopicRequest MakePropsCreateRequest(const TString& path, const TCreateTopicSettings& settings) { + Ydb::Topic::CreateTopicRequest request = MakeOperationRequest<Ydb::Topic::CreateTopicRequest>(settings); + request.set_path(path); + + request.mutable_partitioning_settings()->set_min_active_partitions(settings.PartitioningSettings_.GetMinActivePartitions()); + request.mutable_partitioning_settings()->set_partition_count_limit(settings.PartitioningSettings_.GetPartitionCountLimit()); + + request.mutable_retention_period()->set_seconds(settings.RetentionPeriod_.Seconds()); + + for (const auto& codec : settings.SupportedCodecs_) { + request.mutable_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec))); + } + request.set_partition_write_speed_bytes_per_second(settings.PartitionWriteSpeedBytesPerSecond_); + request.set_partition_write_burst_bytes(settings.PartitionWriteBurstBytes_); + request.set_retention_storage_mb(settings.RetentionStorageMb_); + + for (auto& pair : settings.Attributes_) { + (*request.mutable_attributes())[pair.first] = pair.second; + } + + for (const auto& consumer : settings.Consumers_) { + Ydb::Topic::Consumer& consumerProto = *request.add_consumers(); + ConvertConsumerToProto(consumer, consumerProto); + } + + return request; + } + + + TAsyncStatus CreateTopic(const TString& path, const TCreateTopicSettings& settings) { + auto request = MakePropsCreateRequest(path, settings); + + return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse>( + std::move(request), + &Ydb::Topic::V1::TopicService::Stub::AsyncCreateTopic, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + } + + + static Ydb::Topic::AlterTopicRequest MakePropsAlterRequest(const TString& path, const TAlterTopicSettings& settings) { + Ydb::Topic::AlterTopicRequest request = MakeOperationRequest<Ydb::Topic::AlterTopicRequest>(settings); + request.set_path(path); + + if (settings.AlterPartitioningSettings_) { + request.mutable_alter_partitioning_settings()->set_set_min_active_partitions(settings.AlterPartitioningSettings_->GetMinActivePartitions()); + request.mutable_alter_partitioning_settings()->set_set_partition_count_limit(settings.AlterPartitioningSettings_->GetPartitionCountLimit()); + } + if (settings.SetRetentionPeriod_) { + request.mutable_set_retention_period()->set_seconds(settings.SetRetentionPeriod_->Seconds()); + } + if (settings.SetSupportedCodecs_) { + for (const auto& codec : *settings.SetSupportedCodecs_) { + request.mutable_set_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec))); + } + } + if (settings.SetPartitionWriteSpeedBytesPerSecond_) { + request.set_set_partition_write_speed_bytes_per_second(*settings.SetPartitionWriteSpeedBytesPerSecond_); + } + if (settings.SetPartitionWriteBurstBytes_) { + request.set_set_partition_write_burst_bytes(*settings.SetPartitionWriteBurstBytes_); + } + if (settings.SetRetentionStorageMb_) { + request.set_set_retention_storage_mb(*settings.SetRetentionStorageMb_); + } + + for (auto& pair : settings.AlterAttributes_) { + (*request.mutable_alter_attributes())[pair.first] = pair.second; + } + + for (const auto& consumer : settings.AddConsumers_) { + Ydb::Topic::Consumer& consumerProto = *request.add_add_consumers(); + ConvertConsumerToProto(consumer, consumerProto); + } + + for (const auto& consumer : settings.DropConsumers_) { + request.add_drop_consumers(consumer); + } + + for (const auto& consumer : settings.AlterConsumers_) { + Ydb::Topic::AlterConsumer& consumerProto = *request.add_alter_consumers(); + ConvertAlterConsumerToProto(consumer, consumerProto); + } + + return request; + } + + + TAsyncStatus AlterTopic(const TString& path, const TAlterTopicSettings& settings) { + auto request = MakePropsAlterRequest(path, settings); + + return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse>( + std::move(request), + &Ydb::Topic::V1::TopicService::Stub::AsyncAlterTopic, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + } + + + TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& settings) { + auto request = MakeOperationRequest<Ydb::Topic::DropTopicRequest>(settings); + request.set_path(path); + + return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse>( + std::move(request), + &Ydb::Topic::V1::TopicService::Stub::AsyncDropTopic, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + } + + TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& settings) { + auto request = MakeOperationRequest<Ydb::Topic::DescribeTopicRequest>(settings); + request.set_path(path); + + auto promise = NThreading::NewPromise<TDescribeTopicResult>(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Topic::DescribeTopicResult result; + if (any) { + any->UnpackTo(&result); + } + + TDescribeTopicResult val(TStatus(std::move(status)), std::move(result)); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse>( + std::move(request), + extractor, + &Ydb::Topic::V1::TopicService::Stub::AsyncDescribeTopic, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return promise.GetFuture(); + } + +private: + const TTopicClientSettings Settings; +}; + +} // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp new file mode 100644 index 0000000000..b72ef3c262 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp @@ -0,0 +1,8 @@ +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> + +namespace NYdb { + const Ydb::Topic::DescribeTopicResult& TProtoAccessor::GetProto(const NTopic::TTopicDescription& topicDescription) { + return topicDescription.GetProto(); + } +}// namespace NYdb + diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h new file mode 100644 index 0000000000..10c84c6f14 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -0,0 +1,437 @@ +#pragma once +#include <ydb/public/api/grpc/draft/ydb_topic_v1.grpc.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/logger/log.h> +#include <library/cpp/retry/retry_policy.h> + +#include <util/datetime/base.h> +#include <util/generic/hash.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/generic/size_literals.h> +#include <util/string/builder.h> +#include <util/thread/pool.h> + +#include <exception> +#include <variant> + +namespace NYdb { + class TProtoAccessor; + + namespace NScheme { + struct TPermissions; + } +} + +namespace NYdb::NTopic { + +enum class ECodec : ui32 { + RAW = 1, + GZIP = 2, + LZOP = 3, + ZSTD = 4, + CUSTOM = 10000, +}; + + +class TConsumer { +public: + TConsumer(const Ydb::Topic::Consumer&); + + const TString& GetConsumerName() const; + bool GetImportant() const; + const TInstant& GetReadFrom() const; + const TVector<ECodec>& GetSupportedCodecs() const; + const TMap<TString, TString>& GetAttributes() const; +private: + TString ConsumerName_; + bool Important_; + TInstant ReadFrom_; + TMap<TString, TString> Attributes_; + TVector<ECodec> SupportedCodecs_; +}; + +class TPartitionInfo { +public: + TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo); + ui64 GetPartitionId() const; + bool GetActive() const; + const TVector<ui64> GetChildPartitionIds() const; + const TVector<ui64> GetParentPartitionIds() const; + +private: + ui64 PartitionId_; + bool Active_; + TVector<ui64> ChildPartitionIds_; + TVector<ui64> ParentPartitionIds_; +}; + + +class TPartitioningSettings { +public: + TPartitioningSettings() : MinActivePartitions_(0), PartitionCountLimit_(0){} + TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings); + TPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) : MinActivePartitions_(minActivePartitions), PartitionCountLimit_(partitionCountLimit) {} + + ui64 GetMinActivePartitions() const; + ui64 GetPartitionCountLimit() const; +private: + ui64 MinActivePartitions_; + ui64 PartitionCountLimit_; +}; + +class TTopicDescription { + friend class NYdb::TProtoAccessor; + +public: + TTopicDescription(Ydb::Topic::DescribeTopicResult&& desc); + + const TString& GetOwner() const; + + const TVector<NScheme::TPermissions>& GetPermissions() const; + + const TVector<NScheme::TPermissions>& GetEffectivePermissions() const; + + const TPartitioningSettings& GetPartitioningSettings() const; + + ui32 GetTotalPartitionsCount() const; + + const TVector<TPartitionInfo>& GetPartitions() const; + + const TVector<ECodec>& GetSupportedCodecs() const; + + const TDuration& GetRetentionPeriod() const; + + TMaybe<ui64> GetRetentionStorageMb() const; + + ui64 GetPartitionWriteSpeedBytesPerSecond() const; + + ui64 GetPartitionWriteBurstBytes() const; + + const TMap<TString, TString>& GetAttributes() const; + + const TVector<TConsumer>& GetConsumers() const; + + void SerializeTo(Ydb::Topic::CreateTopicRequest& request) const; +private: + + const Ydb::Topic::DescribeTopicResult& GetProto() const; + + const Ydb::Topic::DescribeTopicResult Proto_; + TVector<TPartitionInfo> Partitions_; + TVector<ECodec> SupportedCodecs_; + TPartitioningSettings PartitioningSettings_; + TDuration RetentionPeriod_; + TMaybe<ui64> RetentionStorageMb_; + ui64 PartitionWriteSpeedBytesPerSecond_; + ui64 PartitionWriteBurstBytes_; + TMap<TString, TString> Attributes_; + TVector<TConsumer> Consumers_; + + TString Owner_; + TVector<NScheme::TPermissions> Permissions_; + TVector<NScheme::TPermissions> EffectivePermissions_; +}; + + +// Result for describe resource request. +struct TDescribeTopicResult : public TStatus { + friend class NYdb::TProtoAccessor; + + + TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result); + + const TTopicDescription& GetTopicDescription() const; + +private: + TTopicDescription TopicDescription_; +}; + +using TAsyncDescribeTopicResult = NThreading::TFuture<TDescribeTopicResult>; + +template <class TSettings> +class TAlterAttributesBuilderImpl { +public: + TAlterAttributesBuilderImpl(TSettings& parent) + : Parent_(parent) + { } + + TAlterAttributesBuilderImpl& Alter(const TString& key, const TString& value) { + AlterAttributes_[key] = value; + return *this; + } + + TAlterAttributesBuilderImpl& Add(const TString& key, const TString& value) { + return Alter(key, value); + } + + TAlterAttributesBuilderImpl& Drop(const TString& key) { + return Alter(key, ""); + } + + TSettings& EndAlterAttributes() { return Parent_; } + +private: + TSettings& Parent_; + THashMap<TString, TString> AlterAttributes_; +}; + + +struct TAlterConsumerSettings; +struct TAlterTopicSettings; +struct TCreateTopicSettings; + +typedef TAlterAttributesBuilderImpl<TAlterConsumerSettings> TAlterConsumerAttributesBuilder; + +typedef TAlterAttributesBuilderImpl<TAlterTopicSettings> TAlterTopicAttributesBuilder; + +template<class TSettings> +struct TConsumerSettings { + using TSelf = TConsumerSettings; + + using TAttributes = TMap<TString, TString>; + + TConsumerSettings(TSettings& parent): Parent_(parent) {} + TConsumerSettings(TSettings& parent, const TString& name) : ConsumerName_(name), Parent_(parent) {} + + FLUENT_SETTING(TString, ConsumerName); + FLUENT_SETTING_DEFAULT(bool, Important, false); + FLUENT_SETTING_DEFAULT(TInstant, ReadFrom, TInstant::Zero()); + + FLUENT_SETTING_VECTOR(ECodec, SupportedCodecs); + + FLUENT_SETTING(TAttributes, Attributes); + + TConsumerSettings& AddAttribute(const TString& key, const TString& value) { + Attributes_[key] = value; + return *this; + } + + TConsumerSettings& SetAttributes(TMap<TString, TString>&& attributes) { + Attributes_ = std::move(attributes); + return *this; + } + + TConsumerSettings& SetAttributes(const TMap<TString, TString>& attributes) { + Attributes_ = attributes; + return *this; + } + + TConsumerSettings& SetSuportedCodecs(TVector<ECodec>&& codecs) { + SupportedCodecs_ = std::move(codecs); + return *this; + } + + TConsumerSettings& SetSuportedCodecs(const TVector<ECodec>& codecs) { + SupportedCodecs_ = codecs; + return *this; + } + + TSettings& EndAddConsumer() { return Parent_; }; + +private: + TSettings& Parent_; +}; + + +struct TAlterConsumerSettings { + using TSelf = TAlterConsumerSettings; + + using TAlterAttributes = TMap<TString, TString>; + + TAlterConsumerSettings(TAlterTopicSettings& parent): Parent_(parent) {} + TAlterConsumerSettings(TAlterTopicSettings& parent, const TString& name) : ConsumerName_(name), Parent_(parent) {} + + FLUENT_SETTING(TString, ConsumerName); + FLUENT_SETTING_OPTIONAL(bool, SetImportant); + FLUENT_SETTING_OPTIONAL(TInstant, SetReadFrom); + + FLUENT_SETTING_OPTIONAL_VECTOR(ECodec, SetSupportedCodecs); + + FLUENT_SETTING(TAlterAttributes, AlterAttributes); + + TAlterConsumerAttributesBuilder BeginAlterAttributes() { + return TAlterConsumerAttributesBuilder(*this); + } + + TAlterConsumerSettings& SetSuportedCodecs(TVector<ECodec>&& codecs) { + SetSupportedCodecs_ = std::move(codecs); + return *this; + } + + TAlterConsumerSettings& SetSuportedCodecs(const TVector<ECodec>& codecs) { + SetSupportedCodecs_ = codecs; + return *this; + } + + TAlterTopicSettings& EndAlterConsumer() { return Parent_; }; + +private: + TAlterTopicSettings& Parent_; +}; + + +struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSettings> { + + using TSelf = TCreateTopicSettings; + using TAttributes = TMap<TString, TString>; + + FLUENT_SETTING(TPartitioningSettings, PartitioningSettings); + + FLUENT_SETTING_DEFAULT(TDuration, RetentionPeriod, TDuration::Hours(24)); + + FLUENT_SETTING_VECTOR(ECodec, SupportedCodecs); + + FLUENT_SETTING_DEFAULT(ui64, RetentionStorageMb, 0); + + FLUENT_SETTING_DEFAULT(ui64, PartitionWriteSpeedBytesPerSecond, 0); + FLUENT_SETTING_DEFAULT(ui64, PartitionWriteBurstBytes, 0); + + FLUENT_SETTING_VECTOR(TConsumerSettings<TCreateTopicSettings>, Consumers); + + FLUENT_SETTING(TAttributes, Attributes); + + + TCreateTopicSettings& SetSuportedCodecs(TVector<ECodec>&& codecs) { + SupportedCodecs_ = std::move(codecs); + return *this; + } + + TCreateTopicSettings& SetSuportedCodecs(const TVector<ECodec>& codecs) { + SupportedCodecs_ = codecs; + return *this; + } + + TConsumerSettings<TCreateTopicSettings>& BeginAddConsumer() { + Consumers_.push_back({*this}); + return Consumers_.back(); + } + + TConsumerSettings<TCreateTopicSettings>& BeginAddConsumer(const TString& name) { + Consumers_.push_back({*this, name}); + return Consumers_.back(); + } + + TCreateTopicSettings& AddAttribute(const TString& key, const TString& value) { + Attributes_[key] = value; + return *this; + } + + TCreateTopicSettings& SetAttributes(TMap<TString, TString>&& attributes) { + Attributes_ = std::move(attributes); + return *this; + } + + TCreateTopicSettings& SetAttributes(const TMap<TString, TString>& attributes) { + Attributes_ = attributes; + return *this; + } + + TCreateTopicSettings& PartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) { + PartitioningSettings_ = TPartitioningSettings(minActivePartitions, partitionCountLimit); + return *this; + } +}; + + +struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSettings> { + + using TSelf = TAlterTopicSettings; + using TAlterAttributes = TMap<TString, TString>; + + FLUENT_SETTING_OPTIONAL(TPartitioningSettings, AlterPartitioningSettings); + + FLUENT_SETTING_OPTIONAL(TDuration, SetRetentionPeriod); + + FLUENT_SETTING_OPTIONAL_VECTOR(ECodec, SetSupportedCodecs); + + FLUENT_SETTING_OPTIONAL(ui64, SetRetentionStorageMb); + + FLUENT_SETTING_OPTIONAL(ui64, SetPartitionWriteSpeedBytesPerSecond); + FLUENT_SETTING_OPTIONAL(ui64, SetPartitionWriteBurstBytes); + + FLUENT_SETTING_VECTOR(TConsumerSettings<TAlterTopicSettings>, AddConsumers); + FLUENT_SETTING_VECTOR(TString, DropConsumers); + FLUENT_SETTING_VECTOR(TAlterConsumerSettings, AlterConsumers); + + FLUENT_SETTING(TAlterAttributes, AlterAttributes); + + TAlterTopicAttributesBuilder BeginAlterAttributes() { + return TAlterTopicAttributesBuilder(*this); + } + + TAlterTopicSettings& SetSuportedCodecs(TVector<ECodec>&& codecs) { + SetSupportedCodecs_ = std::move(codecs); + return *this; + } + + TAlterTopicSettings& SetSuportedCodecs(const TVector<ECodec>& codecs) { + SetSupportedCodecs_ = codecs; + return *this; + } + + TConsumerSettings<TAlterTopicSettings>& BeginAddConsumer() { + AddConsumers_.push_back({*this}); + return AddConsumers_.back(); + } + + TConsumerSettings<TAlterTopicSettings>& BeginAddConsumer(const TString& name) { + AddConsumers_.push_back({*this, name}); + return AddConsumers_.back(); + } + + TAlterConsumerSettings& BeginAlterConsumer() { + AlterConsumers_.push_back({*this}); + return AlterConsumers_.back(); + } + + TAlterConsumerSettings& BeginAlterConsumer(const TString& name) { + AlterConsumers_.push_back({*this, name}); + return AlterConsumers_.back(); + } + + TAlterTopicSettings& AlterPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) { + AlterPartitioningSettings_ = TPartitioningSettings(minActivePartitions, partitionCountLimit); + return *this; + } +}; + + +// Settings for drop resource request. +struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings> {}; + +// Settings for describe resource request. +struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> {}; + + +struct TTopicClientSettings : public TCommonClientSettingsBase<TTopicClientSettings> { + using TSelf = TTopicClientSettings; + +}; + +// Topic client. +class TTopicClient { +public: + class TImpl; + + TTopicClient(const TDriver& driver, const TTopicClientSettings& settings = TTopicClientSettings()); + + // Create a new topic. + TAsyncStatus CreateTopic(const TString& path, const TCreateTopicSettings& settings = {}); + + // Update a topic. + TAsyncStatus AlterTopic(const TString& path, const TAlterTopicSettings& = {}); + + // Delete a topic. + TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& = {}); + + // Describe settings of topic. + TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& = {}); + +private: + std::shared_ptr<TImpl> Impl_; +}; + +} // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h b/ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h index 8747e049f5..cda28a75cf 100644 --- a/ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h +++ b/ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h @@ -42,3 +42,11 @@ name##_.push_back(value); \ return static_cast<TSelf&>(*this); \ } + +#define FLUENT_SETTING_OPTIONAL_VECTOR(type, name) \ + TMaybe<TVector<type>> name##_; \ + TSelf& Append##name(const type& value) { \ + if (!name##_) name##_ = TVector<type>{}; \ + name##_->push_back(value); \ + return static_cast<TSelf&>(*this); \ + } diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index f29ee73e16..05c4064d13 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -39,6 +39,7 @@ #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> namespace NKikimr::NPersQueueTests { @@ -3213,11 +3214,80 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { auto status = TopicStubP_->DescribeTopic(&rcontext, request, &response); UNIT_ASSERT(status.ok()); - Ydb::Topic::DescribeTopicResult res; - response.operation().result().UnpackTo(&res); - Cerr << response << "\n" << res << "\n"; + Ydb::Topic::DescribeTopicResult descrRes; + response.operation().result().UnpackTo(&descrRes); + Cerr << response << "\n" << descrRes << "\n"; UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(res.DebugString(), res1.DebugString()); + UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res1.DebugString()); + + + { + NYdb::TDriverConfig driverCfg; + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort); + std::shared_ptr<NYdb::TDriver> ydbDriver(new NYdb::TDriver(driverCfg)); + auto topicClient = NYdb::NTopic::TTopicClient(*ydbDriver); + + auto res = topicClient.DescribeTopic("/Root/PQ/" + topic3); + res.Wait(); + Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; + UNIT_ASSERT(res.GetValue().IsSuccess()); + auto res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetTopicDescription()); + Cerr << res2 << "\n"; + UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res2.DebugString()); + { + NYdb::NTopic::TCreateTopicSettings settings; + settings.PartitioningSettings(1,1) + .AppendSupportedCodecs((NYdb::NTopic::ECodec)10010) + .PartitionWriteSpeedBytesPerSecond(1024) + .AppendSupportedCodecs(NYdb::NTopic::ECodec::GZIP) + .AddAttribute("_partitions_per_tablet", "10") + .BeginAddConsumer("consumer").ReadFrom(TInstant::Seconds(112233)) + .Important(true) + .AddAttribute("_version", "5") + .EndAddConsumer() + .AppendSupportedCodecs((NYdb::NTopic::ECodec)10011); + + auto res = topicClient.CreateTopic("/Root/PQ/" + topic3 + "2", settings); + res.Wait(); + Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; + UNIT_ASSERT(res.GetValue().IsSuccess()); + } + + { + NYdb::NTopic::TAlterTopicSettings settings; + settings.AlterPartitioningSettings(2,2) + .AppendSetSupportedCodecs((NYdb::NTopic::ECodec)10022) + .SetPartitionWriteSpeedBytesPerSecond(102400) + .SetRetentionPeriod(TDuration::Days(2)) + .BeginAlterAttributes().Add("_partitions_per_tablet", "") + .Drop("_abc_id") + .EndAlterAttributes() + .BeginAlterConsumer("consumer").SetReadFrom(TInstant::Seconds(1122)) + .BeginAlterAttributes().Alter("_version", "5") + .EndAlterAttributes() + .EndAlterConsumer() + .AppendSetSupportedCodecs((NYdb::NTopic::ECodec)10020); + + auto res = topicClient.AlterTopic("/Root/PQ/" + topic3 + "2", settings); + res.Wait(); + Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; + UNIT_ASSERT(res.GetValue().IsSuccess()); + } + + res = topicClient.DescribeTopic("/Root/PQ/" + topic3 + "2"); + res.Wait(); + Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; + UNIT_ASSERT(res.GetValue().IsSuccess()); + res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetTopicDescription()); + Cerr << "ANOTHER TOPIC: " << res2 << "\n"; + auto& description = res.GetValue().GetTopicDescription(); + UNIT_ASSERT_VALUES_EQUAL(description.GetTotalPartitionsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), 1); + TVector<NYdb::NTopic::ECodec> codecs = {(NYdb::NTopic::ECodec)10022, (NYdb::NTopic::ECodec)10020}; + UNIT_ASSERT_VALUES_EQUAL(description.GetSupportedCodecs(), codecs); + UNIT_ASSERT_VALUES_EQUAL(description.GetEffectivePermissions().size(), 0); + } + } { Ydb::Topic::DropTopicRequest request; diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt index ea0f1efcb2..5f8c3c4d4c 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt @@ -30,6 +30,8 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC ydb_persqueue_core-ut-ut_utils cpp-client-ydb_persqueue_public cpp-client-ydb_table + cpp-client-ydb_topic + cpp-client-ydb_proto ) target_link_options(ydb-services-persqueue_v1-ut PRIVATE -Wl,-no_deduplicate diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt index f8fe0dd9bd..c082f4e70d 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt @@ -32,6 +32,8 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC ydb_persqueue_core-ut-ut_utils cpp-client-ydb_persqueue_public cpp-client-ydb_table + cpp-client-ydb_topic + cpp-client-ydb_proto ) target_link_options(ydb-services-persqueue_v1-ut PRIVATE -ldl |