diff options
author | shmel1k <shmel1k@ydb.tech> | 2022-08-31 16:27:21 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2022-08-31 16:27:21 +0300 |
commit | 0bb12157f3819a28716142d0c47034a829f17657 (patch) | |
tree | 1646035e61b6036d581114f824dc57bbe91f6e14 | |
parent | 4c49249d76fa5c9382f366ce4a8047e02ea11d53 (diff) | |
download | ydb-0bb12157f3819a28716142d0c47034a829f17657.tar.gz |
[] add topic metering mode to cpp sdk
8 files changed, 68 insertions, 13 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt index 02d362f1ce..b5d67ca599 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt @@ -14,6 +14,7 @@ target_link_libraries(client-ydb_persqueue_core-impl PUBLIC cpp-containers-disjoint_interval_tree cpp-grpc-client cpp-monlib-dynamic_counters + cpp-monlib-metrics cpp-string_utils-url library-persqueue-obfuscate api-grpc-draft diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index a8d480b744..4b606e0352 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -2387,57 +2387,57 @@ void TErrorHandler<UseMigrationProtocol>::AbortSession(TASessionClosedEvent<UseM } } -#define HISTOGRAM_SETUP NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) +#define HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) template <typename TReaderCounters> void MakeCountersNotNull(TReaderCounters& counters) { if (!counters.Errors) { - counters.Errors = MakeIntrusive<NMonitoring::TCounterForPtr>(true); + counters.Errors = MakeIntrusive<::NMonitoring::TCounterForPtr>(true); } if (!counters.CurrentSessionLifetimeMs) { - counters.CurrentSessionLifetimeMs = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + counters.CurrentSessionLifetimeMs = MakeIntrusive<::NMonitoring::TCounterForPtr>(false); } if (!counters.BytesRead) { - counters.BytesRead = MakeIntrusive<NMonitoring::TCounterForPtr>(true); + counters.BytesRead = MakeIntrusive<::NMonitoring::TCounterForPtr>(true); } if (!counters.MessagesRead) { - counters.MessagesRead = MakeIntrusive<NMonitoring::TCounterForPtr>(true); + counters.MessagesRead = MakeIntrusive<::NMonitoring::TCounterForPtr>(true); } if (!counters.BytesReadCompressed) { - counters.BytesReadCompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(true); + counters.BytesReadCompressed = MakeIntrusive<::NMonitoring::TCounterForPtr>(true); } if (!counters.BytesInflightUncompressed) { - counters.BytesInflightUncompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + counters.BytesInflightUncompressed = MakeIntrusive<::NMonitoring::TCounterForPtr>(false); } if (!counters.BytesInflightCompressed) { - counters.BytesInflightCompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + counters.BytesInflightCompressed = MakeIntrusive<::NMonitoring::TCounterForPtr>(false); } if (!counters.BytesInflightTotal) { - counters.BytesInflightTotal = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + counters.BytesInflightTotal = MakeIntrusive<::NMonitoring::TCounterForPtr>(false); } if (!counters.MessagesInflight) { - counters.MessagesInflight = MakeIntrusive<NMonitoring::TCounterForPtr>(false); + counters.MessagesInflight = MakeIntrusive<::NMonitoring::TCounterForPtr>(false); } if (!counters.TotalBytesInflightUsageByTime) { - counters.TotalBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); + counters.TotalBytesInflightUsageByTime = MakeIntrusive<::NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); } if (!counters.UncompressedBytesInflightUsageByTime) { - counters.UncompressedBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); + counters.UncompressedBytesInflightUsageByTime = MakeIntrusive<::NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); } if (!counters.CompressedBytesInflightUsageByTime) { - counters.CompressedBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); + counters.CompressedBytesInflightUsageByTime = MakeIntrusive<::NMonitoring::THistogramCounter>(HISTOGRAM_SETUP); } } diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h index b908001ead..b62ad5766e 100644 --- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h +++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h @@ -51,6 +51,9 @@ public: static Ydb::Table::ValueSinceUnixEpochModeSettings::Unit GetProto(NTable::TValueSinceUnixEpochModeSettings::EUnit value); static NTable::TValueSinceUnixEpochModeSettings::EUnit FromProto(Ydb::Table::ValueSinceUnixEpochModeSettings::Unit value); + static Ydb::Topic::MeteringMode GetProto(NTopic::EMeteringMode mode); + static NTopic::EMeteringMode FromProto(Ydb::Topic::MeteringMode mode); + // exports & imports template <typename TProtoSettings> static typename TProtoSettings::Scheme GetProto(ES3Scheme value); template <typename TProtoSettings> static ES3Scheme FromProto(typename TProtoSettings::Scheme value); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt index 8a4e2595b3..e138e1085f 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt @@ -20,6 +20,7 @@ target_link_libraries(client-ydb_topic-impl PUBLIC client-ydb_common_client-impl cpp-client-ydb_driver client-ydb_persqueue_core-impl + cpp-client-ydb_proto ) target_sources(client-ydb_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index 0070af0c7f..c82668f104 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -34,6 +34,7 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result) , RetentionStorageMb_(Proto_.retention_storage_mb() > 0 ? TMaybe<ui64>(Proto_.retention_storage_mb()) : Nothing()) , PartitionWriteSpeedBytesPerSecond_(Proto_.partition_write_speed_bytes_per_second()) , PartitionWriteBurstBytes_(Proto_.partition_write_burst_bytes()) + , MeteringMode_(TProtoAccessor::FromProto(Proto_.metering_mode())) { Owner_ = Proto_.self().owner(); PermissionToSchemeEntry(Proto_.self().permissions(), &Permissions_); @@ -118,6 +119,10 @@ ui64 TTopicDescription::GetPartitionWriteBurstBytes() const { return PartitionWriteBurstBytes_; } +EMeteringMode TTopicDescription::GetMeteringMode() const { + return MeteringMode_; +} + const TMap<TString, TString>& TTopicDescription::GetAttributes() const { return Attributes_; } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h index 1d4cb3098e..15589ab160 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h @@ -7,6 +7,7 @@ #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> #include <ydb/public/sdk/cpp/client/ydb_topic/impl/executor.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> @@ -70,6 +71,7 @@ public: request.set_partition_write_speed_bytes_per_second(settings.PartitionWriteSpeedBytesPerSecond_); request.set_partition_write_burst_bytes(settings.PartitionWriteBurstBytes_); request.set_retention_storage_mb(settings.RetentionStorageMb_); + request.set_metering_mode(TProtoAccessor::GetProto(settings.MeteringMode_)); for (auto& pair : settings.Attributes_) { (*request.mutable_attributes())[pair.first] = pair.second; @@ -120,6 +122,9 @@ public: if (settings.SetRetentionStorageMb_) { request.set_set_retention_storage_mb(*settings.SetRetentionStorageMb_); } + if (settings.SetMeteringMode_) { + request.set_set_metering_mode(TProtoAccessor::GetProto(*settings.SetMeteringMode_)); + } for (auto& pair : settings.AlterAttributes_) { (*request.mutable_alter_attributes())[pair.first] = pair.second; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp index b72ef3c262..b0e7353d25 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp @@ -4,5 +4,31 @@ namespace NYdb { const Ydb::Topic::DescribeTopicResult& TProtoAccessor::GetProto(const NTopic::TTopicDescription& topicDescription) { return topicDescription.GetProto(); } + + Ydb::Topic::MeteringMode TProtoAccessor::GetProto(NTopic::EMeteringMode mode) { + switch (mode) { + case NTopic::EMeteringMode::Unspecified: + return Ydb::Topic::METERING_MODE_UNSPECIFIED; + case NTopic::EMeteringMode::RequestUnits: + return Ydb::Topic::METERING_MODE_REQUEST_UNITS; + case NTopic::EMeteringMode::ReservedCapacity: + return Ydb::Topic::METERING_MODE_RESERVED_CAPACITY; + case NTopic::EMeteringMode::Unknown: + return Ydb::Topic::METERING_MODE_UNSPECIFIED; + } + } + + NTopic::EMeteringMode TProtoAccessor::FromProto(Ydb::Topic::MeteringMode mode) { + switch (mode) { + case Ydb::Topic::MeteringMode::METERING_MODE_UNSPECIFIED: + return NTopic::EMeteringMode::Unspecified; + case Ydb::Topic::MeteringMode::METERING_MODE_REQUEST_UNITS: + return NTopic::EMeteringMode::RequestUnits; + case Ydb::Topic::MeteringMode::METERING_MODE_RESERVED_CAPACITY: + return NTopic::EMeteringMode::ReservedCapacity; + default: + return NTopic::EMeteringMode::Unknown; + } + } }// namespace NYdb diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 116fb2fb71..a10aaf5956 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -36,6 +36,14 @@ enum class ECodec : ui32 { CUSTOM = 10000, }; +enum class EMeteringMode : ui32 { + Unspecified = 0, + ReservedCapacity = 1, + RequestUnits = 2, + + Unknown = std::numeric_limits<int>::max(), +}; + class TConsumer { public: @@ -120,6 +128,8 @@ private: const Ydb::Topic::DescribeTopicResult& GetProto() const; + EMeteringMode GetMeteringMode() const; + const Ydb::Topic::DescribeTopicResult Proto_; TVector<TPartitionInfo> Partitions_; TVector<ECodec> SupportedCodecs_; @@ -128,6 +138,7 @@ private: TMaybe<ui64> RetentionStorageMb_; ui64 PartitionWriteSpeedBytesPerSecond_; ui64 PartitionWriteBurstBytes_; + EMeteringMode MeteringMode_; TMap<TString, TString> Attributes_; TVector<TConsumer> Consumers_; @@ -285,6 +296,7 @@ struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSetti FLUENT_SETTING_VECTOR(ECodec, SupportedCodecs); FLUENT_SETTING_DEFAULT(ui64, RetentionStorageMb, 0); + FLUENT_SETTING_DEFAULT(EMeteringMode, MeteringMode, EMeteringMode::Unspecified); FLUENT_SETTING_DEFAULT(ui64, PartitionWriteSpeedBytesPerSecond, 0); FLUENT_SETTING_DEFAULT(ui64, PartitionWriteBurstBytes, 0); @@ -352,6 +364,8 @@ struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSetting FLUENT_SETTING_OPTIONAL(ui64, SetPartitionWriteSpeedBytesPerSecond); FLUENT_SETTING_OPTIONAL(ui64, SetPartitionWriteBurstBytes); + FLUENT_SETTING_OPTIONAL(EMeteringMode, SetMeteringMode); + FLUENT_SETTING_VECTOR(TConsumerSettings<TAlterTopicSettings>, AddConsumers); FLUENT_SETTING_VECTOR(TString, DropConsumers); FLUENT_SETTING_VECTOR(TAlterConsumerSettings, AlterConsumers); |