aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2022-08-31 16:27:21 +0300
committershmel1k <shmel1k@ydb.tech>2022-08-31 16:27:21 +0300
commit0bb12157f3819a28716142d0c47034a829f17657 (patch)
tree1646035e61b6036d581114f824dc57bbe91f6e14
parent4c49249d76fa5c9382f366ce4a8047e02ea11d53 (diff)
downloadydb-0bb12157f3819a28716142d0c47034a829f17657.tar.gz
[] add topic metering mode to cpp sdk
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp26
-rw-r--r--ydb/public/sdk/cpp/client/ydb_proto/accessor.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp26
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h14
8 files changed, 68 insertions, 13 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt
index 02d362f1ce..b5d67ca599 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.txt
@@ -14,6 +14,7 @@ target_link_libraries(client-ydb_persqueue_core-impl PUBLIC
cpp-containers-disjoint_interval_tree
cpp-grpc-client
cpp-monlib-dynamic_counters
+ cpp-monlib-metrics
cpp-string_utils-url
library-persqueue-obfuscate
api-grpc-draft
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index a8d480b744..4b606e0352 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -2387,57 +2387,57 @@ void TErrorHandler<UseMigrationProtocol>::AbortSession(TASessionClosedEvent<UseM
}
}
-#define HISTOGRAM_SETUP NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100})
+#define HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100})
template <typename TReaderCounters>
void MakeCountersNotNull(TReaderCounters& counters) {
if (!counters.Errors) {
- counters.Errors = MakeIntrusive<NMonitoring::TCounterForPtr>(true);
+ counters.Errors = MakeIntrusive<::NMonitoring::TCounterForPtr>(true);
}
if (!counters.CurrentSessionLifetimeMs) {
- counters.CurrentSessionLifetimeMs = MakeIntrusive<NMonitoring::TCounterForPtr>(false);
+ counters.CurrentSessionLifetimeMs = MakeIntrusive<::NMonitoring::TCounterForPtr>(false);
}
if (!counters.BytesRead) {
- counters.BytesRead = MakeIntrusive<NMonitoring::TCounterForPtr>(true);
+ counters.BytesRead = MakeIntrusive<::NMonitoring::TCounterForPtr>(true);
}
if (!counters.MessagesRead) {
- counters.MessagesRead = MakeIntrusive<NMonitoring::TCounterForPtr>(true);
+ counters.MessagesRead = MakeIntrusive<::NMonitoring::TCounterForPtr>(true);
}
if (!counters.BytesReadCompressed) {
- counters.BytesReadCompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(true);
+ counters.BytesReadCompressed = MakeIntrusive<::NMonitoring::TCounterForPtr>(true);
}
if (!counters.BytesInflightUncompressed) {
- counters.BytesInflightUncompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(false);
+ counters.BytesInflightUncompressed = MakeIntrusive<::NMonitoring::TCounterForPtr>(false);
}
if (!counters.BytesInflightCompressed) {
- counters.BytesInflightCompressed = MakeIntrusive<NMonitoring::TCounterForPtr>(false);
+ counters.BytesInflightCompressed = MakeIntrusive<::NMonitoring::TCounterForPtr>(false);
}
if (!counters.BytesInflightTotal) {
- counters.BytesInflightTotal = MakeIntrusive<NMonitoring::TCounterForPtr>(false);
+ counters.BytesInflightTotal = MakeIntrusive<::NMonitoring::TCounterForPtr>(false);
}
if (!counters.MessagesInflight) {
- counters.MessagesInflight = MakeIntrusive<NMonitoring::TCounterForPtr>(false);
+ counters.MessagesInflight = MakeIntrusive<::NMonitoring::TCounterForPtr>(false);
}
if (!counters.TotalBytesInflightUsageByTime) {
- counters.TotalBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP);
+ counters.TotalBytesInflightUsageByTime = MakeIntrusive<::NMonitoring::THistogramCounter>(HISTOGRAM_SETUP);
}
if (!counters.UncompressedBytesInflightUsageByTime) {
- counters.UncompressedBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP);
+ counters.UncompressedBytesInflightUsageByTime = MakeIntrusive<::NMonitoring::THistogramCounter>(HISTOGRAM_SETUP);
}
if (!counters.CompressedBytesInflightUsageByTime) {
- counters.CompressedBytesInflightUsageByTime = MakeIntrusive<NMonitoring::THistogramCounter>(HISTOGRAM_SETUP);
+ counters.CompressedBytesInflightUsageByTime = MakeIntrusive<::NMonitoring::THistogramCounter>(HISTOGRAM_SETUP);
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
index b908001ead..b62ad5766e 100644
--- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
+++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
@@ -51,6 +51,9 @@ public:
static Ydb::Table::ValueSinceUnixEpochModeSettings::Unit GetProto(NTable::TValueSinceUnixEpochModeSettings::EUnit value);
static NTable::TValueSinceUnixEpochModeSettings::EUnit FromProto(Ydb::Table::ValueSinceUnixEpochModeSettings::Unit value);
+ static Ydb::Topic::MeteringMode GetProto(NTopic::EMeteringMode mode);
+ static NTopic::EMeteringMode FromProto(Ydb::Topic::MeteringMode mode);
+
// exports & imports
template <typename TProtoSettings> static typename TProtoSettings::Scheme GetProto(ES3Scheme value);
template <typename TProtoSettings> static ES3Scheme FromProto(typename TProtoSettings::Scheme value);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt
index 8a4e2595b3..e138e1085f 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.txt
@@ -20,6 +20,7 @@ target_link_libraries(client-ydb_topic-impl PUBLIC
client-ydb_common_client-impl
cpp-client-ydb_driver
client-ydb_persqueue_core-impl
+ cpp-client-ydb_proto
)
target_sources(client-ydb_topic-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/executor.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
index 0070af0c7f..c82668f104 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -34,6 +34,7 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result)
, RetentionStorageMb_(Proto_.retention_storage_mb() > 0 ? TMaybe<ui64>(Proto_.retention_storage_mb()) : Nothing())
, PartitionWriteSpeedBytesPerSecond_(Proto_.partition_write_speed_bytes_per_second())
, PartitionWriteBurstBytes_(Proto_.partition_write_burst_bytes())
+ , MeteringMode_(TProtoAccessor::FromProto(Proto_.metering_mode()))
{
Owner_ = Proto_.self().owner();
PermissionToSchemeEntry(Proto_.self().permissions(), &Permissions_);
@@ -118,6 +119,10 @@ ui64 TTopicDescription::GetPartitionWriteBurstBytes() const {
return PartitionWriteBurstBytes_;
}
+EMeteringMode TTopicDescription::GetMeteringMode() const {
+ return MeteringMode_;
+}
+
const TMap<TString, TString>& TTopicDescription::GetAttributes() const {
return Attributes_;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
index 1d4cb3098e..15589ab160 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
@@ -7,6 +7,7 @@
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/executor.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
@@ -70,6 +71,7 @@ public:
request.set_partition_write_speed_bytes_per_second(settings.PartitionWriteSpeedBytesPerSecond_);
request.set_partition_write_burst_bytes(settings.PartitionWriteBurstBytes_);
request.set_retention_storage_mb(settings.RetentionStorageMb_);
+ request.set_metering_mode(TProtoAccessor::GetProto(settings.MeteringMode_));
for (auto& pair : settings.Attributes_) {
(*request.mutable_attributes())[pair.first] = pair.second;
@@ -120,6 +122,9 @@ public:
if (settings.SetRetentionStorageMb_) {
request.set_set_retention_storage_mb(*settings.SetRetentionStorageMb_);
}
+ if (settings.SetMeteringMode_) {
+ request.set_set_metering_mode(TProtoAccessor::GetProto(*settings.SetMeteringMode_));
+ }
for (auto& pair : settings.AlterAttributes_) {
(*request.mutable_alter_attributes())[pair.first] = pair.second;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
index b72ef3c262..b0e7353d25 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
@@ -4,5 +4,31 @@ namespace NYdb {
const Ydb::Topic::DescribeTopicResult& TProtoAccessor::GetProto(const NTopic::TTopicDescription& topicDescription) {
return topicDescription.GetProto();
}
+
+ Ydb::Topic::MeteringMode TProtoAccessor::GetProto(NTopic::EMeteringMode mode) {
+ switch (mode) {
+ case NTopic::EMeteringMode::Unspecified:
+ return Ydb::Topic::METERING_MODE_UNSPECIFIED;
+ case NTopic::EMeteringMode::RequestUnits:
+ return Ydb::Topic::METERING_MODE_REQUEST_UNITS;
+ case NTopic::EMeteringMode::ReservedCapacity:
+ return Ydb::Topic::METERING_MODE_RESERVED_CAPACITY;
+ case NTopic::EMeteringMode::Unknown:
+ return Ydb::Topic::METERING_MODE_UNSPECIFIED;
+ }
+ }
+
+ NTopic::EMeteringMode TProtoAccessor::FromProto(Ydb::Topic::MeteringMode mode) {
+ switch (mode) {
+ case Ydb::Topic::MeteringMode::METERING_MODE_UNSPECIFIED:
+ return NTopic::EMeteringMode::Unspecified;
+ case Ydb::Topic::MeteringMode::METERING_MODE_REQUEST_UNITS:
+ return NTopic::EMeteringMode::RequestUnits;
+ case Ydb::Topic::MeteringMode::METERING_MODE_RESERVED_CAPACITY:
+ return NTopic::EMeteringMode::ReservedCapacity;
+ default:
+ return NTopic::EMeteringMode::Unknown;
+ }
+ }
}// namespace NYdb
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index 116fb2fb71..a10aaf5956 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -36,6 +36,14 @@ enum class ECodec : ui32 {
CUSTOM = 10000,
};
+enum class EMeteringMode : ui32 {
+ Unspecified = 0,
+ ReservedCapacity = 1,
+ RequestUnits = 2,
+
+ Unknown = std::numeric_limits<int>::max(),
+};
+
class TConsumer {
public:
@@ -120,6 +128,8 @@ private:
const Ydb::Topic::DescribeTopicResult& GetProto() const;
+ EMeteringMode GetMeteringMode() const;
+
const Ydb::Topic::DescribeTopicResult Proto_;
TVector<TPartitionInfo> Partitions_;
TVector<ECodec> SupportedCodecs_;
@@ -128,6 +138,7 @@ private:
TMaybe<ui64> RetentionStorageMb_;
ui64 PartitionWriteSpeedBytesPerSecond_;
ui64 PartitionWriteBurstBytes_;
+ EMeteringMode MeteringMode_;
TMap<TString, TString> Attributes_;
TVector<TConsumer> Consumers_;
@@ -285,6 +296,7 @@ struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSetti
FLUENT_SETTING_VECTOR(ECodec, SupportedCodecs);
FLUENT_SETTING_DEFAULT(ui64, RetentionStorageMb, 0);
+ FLUENT_SETTING_DEFAULT(EMeteringMode, MeteringMode, EMeteringMode::Unspecified);
FLUENT_SETTING_DEFAULT(ui64, PartitionWriteSpeedBytesPerSecond, 0);
FLUENT_SETTING_DEFAULT(ui64, PartitionWriteBurstBytes, 0);
@@ -352,6 +364,8 @@ struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSetting
FLUENT_SETTING_OPTIONAL(ui64, SetPartitionWriteSpeedBytesPerSecond);
FLUENT_SETTING_OPTIONAL(ui64, SetPartitionWriteBurstBytes);
+ FLUENT_SETTING_OPTIONAL(EMeteringMode, SetMeteringMode);
+
FLUENT_SETTING_VECTOR(TConsumerSettings<TAlterTopicSettings>, AddConsumers);
FLUENT_SETTING_VECTOR(TString, DropConsumers);
FLUENT_SETTING_VECTOR(TAlterConsumerSettings, AlterConsumers);