aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2025-02-11 13:54:17 +0300
committerGitHub <noreply@github.com>2025-02-11 13:54:17 +0300
commit4b90b10cf60b38930d8f17881961b780d00dc73b (patch)
tree86f6c721255b2a3424a62e98c823658df2b3d6b4
parent1d9eb5a4d7146fc335de85453b42ddda0a997577 (diff)
downloadydb-4b90b10cf60b38930d8f17881961b780d00dc73b.tar.gz
Support all rate limiter's properties in cpp sdk (#14405)
-rw-r--r--ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h145
-rw-r--r--ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp162
2 files changed, 304 insertions, 3 deletions
diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h
index 836d423154..2f03acc9f3 100644
--- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h
+++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h
@@ -2,14 +2,52 @@
#include <ydb-cpp-sdk/client/driver/driver.h>
+#include <chrono>
+#include <unordered_map>
+#include <variant>
+
namespace Ydb::RateLimiter {
-class CreateResourceRequest;
-class DescribeResourceResult;
-class HierarchicalDrrSettings;
+ class CreateResourceRequest;
+ class DescribeResourceResult;
+ class HierarchicalDrrSettings;
+ class ReplicatedBucketSettings;
+ class MeteringConfig;
+ class MeteringConfig_Metric;
} // namespace Ydb::RateLimiter
namespace NYdb::inline V3::NRateLimiter {
+struct TReplicatedBucketSettings {
+ using TSelf = TReplicatedBucketSettings;
+
+ TReplicatedBucketSettings() = default;
+ TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings&);
+
+ void SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings&) const;
+
+ // Interval between syncs from kesus and between consumption reports.
+ // Default value equals 5000 ms and not inherited.
+ FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportInterval);
+};
+
+class TLeafBehavior {
+public:
+ enum EBehavior {
+ REPLICATED_BUCKET,
+ };
+
+ EBehavior GetBehavior() const;
+
+ TLeafBehavior(const TReplicatedBucketSettings&);
+ TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings&);
+ const TReplicatedBucketSettings& GetReplicatedBucket() const;
+
+ void SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings&) const;
+
+private:
+ std::variant<TReplicatedBucketSettings> BehaviorSettings_;
+};
+
// Settings for hierarchical deficit round robin (HDRR) algorithm.
template <class TDerived>
struct THierarchicalDrrSettings {
@@ -46,6 +84,91 @@ struct THierarchicalDrrSettings {
// Default value is inherited from parent or 0.75 for root.
// Must be nonnegative and less than or equal to 1.
FLUENT_SETTING_OPTIONAL(double, PrefetchWatermark);
+
+ // Prevents bucket from going too deep in negative values. If somebody reports value that will exceed
+ // this limit the final amount in bucket will be equal to this limit.
+ // Should be negative value.
+ // Unset means no limit.
+ FLUENT_SETTING_OPTIONAL(double, ImmediatelyFillUpTo);
+
+ // Behavior of leafs in tree.
+ // Not inherited.
+ FLUENT_SETTING_OPTIONAL(TLeafBehavior, LeafBehavior);
+};
+
+struct TMetric {
+ using TSelf = TMetric;
+ using TLabels = std::unordered_map<std::string, std::string>;
+
+ TMetric() = default;
+ TMetric(const Ydb::RateLimiter::MeteringConfig_Metric&);
+
+ void SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric&) const;
+
+ // Send this metric to billing.
+ // Default value is false (not inherited).
+ FLUENT_SETTING_DEFAULT(bool, Enabled, false);
+
+ // Billing metric period (aligned to hour boundary).
+ // Default value is inherited from parent or equals 60 seconds for root.
+ FLUENT_SETTING_OPTIONAL(std::chrono::seconds, BillingPeriod);
+
+ // User-defined labels.
+ FLUENT_SETTING(TLabels, Labels);
+
+ // Billing metric JSON fields (inherited from parent if not set)
+ FLUENT_SETTING(std::string, MetricFieldsJson);
+};
+
+struct TMeteringConfig {
+ using TSelf = TMeteringConfig;
+
+ TMeteringConfig() = default;
+ TMeteringConfig(const Ydb::RateLimiter::MeteringConfig&);
+
+ void SerializeTo(Ydb::RateLimiter::MeteringConfig&) const;
+
+ // Meter consumed resources and send billing metrics.
+ FLUENT_SETTING_DEFAULT(bool, Enabled, false);
+
+ // Period to report consumption history from clients to kesus
+ // Default value is inherited from parent or equals 5000 ms for root.
+ FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportPeriod);
+
+ // Consumption history period that is sent in one message to metering actor.
+ // Default value is inherited from parent or equals 1000 ms for root.
+ FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, MeterPeriod);
+
+ // Time window to collect data from every client.
+ // Any client metering message that is `collect_period` late is discarded (not metered or billed).
+ // Default value is inherited from parent or equals 30 seconds for root.
+ FLUENT_SETTING_OPTIONAL(std::chrono::seconds, CollectPeriod);
+
+ // Provisioned consumption limit in units per second.
+ // Effective value is limited by corresponding `max_units_per_second`.
+ // Default value is 0 (not inherited).
+ FLUENT_SETTING_OPTIONAL(double, ProvisionedUnitsPerSecond);
+
+ // Provisioned allowed burst equals `provisioned_coefficient * provisioned_units_per_second` units.
+ // Effective value is limited by corresponding PrefetchCoefficient.
+ // Default value is inherited from parent or equals 60 for root.
+ FLUENT_SETTING_OPTIONAL(double, ProvisionedCoefficient);
+
+ // On-demand allowed burst equals `overshoot_coefficient * prefetch_coefficient * max_units_per_second` units.
+ // Should be greater or equal to 1.0
+ // Default value is inherited from parent or equals 1.1 for root
+ FLUENT_SETTING_OPTIONAL(double, OvershootCoefficient);
+
+ // Consumption within provisioned limit.
+ // Informative metric that should be sent to billing (not billed).
+ FLUENT_SETTING_OPTIONAL(TMetric, Provisioned);
+
+ // Consumption that exceeds provisioned limit is billed as on-demand.
+ FLUENT_SETTING_OPTIONAL(TMetric, OnDemand);
+
+ // Consumption that exceeds even on-demand limit.
+ // Normally it is free and should not be billed.
+ FLUENT_SETTING_OPTIONAL(TMetric, Overshoot);
};
// Settings for create resource request.
@@ -55,6 +178,8 @@ struct TCreateResourceSettings
{
TCreateResourceSettings() = default;
TCreateResourceSettings(const Ydb::RateLimiter::CreateResourceRequest&);
+
+ FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig);
};
// Settings for alter resource request.
@@ -62,6 +187,7 @@ struct TAlterResourceSettings
: public TOperationRequestSettings<TAlterResourceSettings>
, public THierarchicalDrrSettings<TAlterResourceSettings>
{
+ FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig);
};
// Settings for drop resource request.
@@ -128,6 +254,14 @@ struct TDescribeResourceResult : public TStatus {
std::optional<double> GetPrefetchWatermark() const {
return PrefetchWatermark_;
}
+
+ std::optional<double> GetImmediatelyFillUpTo() const {
+ return ImmediatelyFillUpTo_;
+ }
+
+ const std::optional<TLeafBehavior>& GetLeafBehavior() const {
+ return LeafBehavior_;
+ }
};
TDescribeResourceResult(TStatus status, const Ydb::RateLimiter::DescribeResourceResult& result);
@@ -141,9 +275,14 @@ struct TDescribeResourceResult : public TStatus {
return HierarchicalDrrProps_;
}
+ const TMeteringConfig& GetMeteringConfig() const {
+ return MeteringConfig_;
+ }
+
private:
std::string ResourcePath_;
THierarchicalDrrProps HierarchicalDrrProps_;
+ TMeteringConfig MeteringConfig_;
};
using TAsyncDescribeResourceResult = NThreading::TFuture<TDescribeResourceResult>;
diff --git a/ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp b/ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp
index 8571d45452..bf71542d6d 100644
--- a/ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp
+++ b/ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp
@@ -7,8 +7,47 @@
#include <ydb/public/api/grpc/ydb_rate_limiter_v1.grpc.pb.h>
#include <src/client/common_client/impl/client.h>
+#include <google/protobuf/util/json_util.h>
+
namespace NYdb::inline V3::NRateLimiter {
+TReplicatedBucketSettings::TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings& proto) {
+ if (proto.has_report_interval_ms()) {
+ ReportInterval_ = std::chrono::milliseconds(proto.report_interval_ms());
+ }
+}
+
+void TReplicatedBucketSettings::SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings& proto) const {
+ if (ReportInterval_) {
+ proto.set_report_interval_ms(ReportInterval_->count());
+ }
+}
+
+TLeafBehavior::EBehavior TLeafBehavior::GetBehavior() const {
+ return static_cast<EBehavior>(BehaviorSettings_.index());
+}
+
+TLeafBehavior::TLeafBehavior(const TReplicatedBucketSettings& replicatedBucket)
+ : BehaviorSettings_(replicatedBucket)
+{
+}
+
+TLeafBehavior::TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings& replicatedBucket)
+ : BehaviorSettings_(replicatedBucket)
+{
+}
+
+const TReplicatedBucketSettings& TLeafBehavior::GetReplicatedBucket() const {
+ return std::get<TReplicatedBucketSettings>(BehaviorSettings_);
+}
+
+void TLeafBehavior::SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings& proto) const {
+ switch (GetBehavior()) {
+ case REPLICATED_BUCKET:
+ return GetReplicatedBucket().SerializeTo(*proto.mutable_replicated_bucket());
+ }
+}
+
template <class TDerived>
THierarchicalDrrSettings<TDerived>::THierarchicalDrrSettings(const Ydb::RateLimiter::HierarchicalDrrSettings& proto) {
if (proto.max_units_per_second()) {
@@ -26,6 +65,18 @@ THierarchicalDrrSettings<TDerived>::THierarchicalDrrSettings(const Ydb::RateLimi
if (proto.prefetch_watermark()) {
PrefetchWatermark_ = proto.prefetch_watermark();
}
+
+ if (proto.has_immediately_fill_up_to()) {
+ ImmediatelyFillUpTo_ = proto.immediately_fill_up_to();
+ }
+
+ switch (proto.leaf_behavior_case()) {
+ case Ydb::RateLimiter::HierarchicalDrrSettings::kReplicatedBucket:
+ LeafBehavior_.emplace(proto.replicated_bucket());
+ break;
+ case Ydb::RateLimiter::HierarchicalDrrSettings::LEAF_BEHAVIOR_NOT_SET:
+ break;
+ }
}
template <class TDerived>
@@ -45,6 +96,105 @@ void THierarchicalDrrSettings<TDerived>::SerializeTo(Ydb::RateLimiter::Hierarchi
if (PrefetchWatermark_) {
proto.set_prefetch_watermark(*PrefetchWatermark_);
}
+
+ if (ImmediatelyFillUpTo_) {
+ proto.set_immediately_fill_up_to(*ImmediatelyFillUpTo_);
+ }
+
+ if (LeafBehavior_) {
+ LeafBehavior_->SerializeTo(proto);
+ }
+}
+
+TMetric::TMetric(const Ydb::RateLimiter::MeteringConfig_Metric& proto) {
+ Enabled_ = proto.enabled();
+ if (proto.billing_period_sec()) {
+ BillingPeriod_ = std::chrono::seconds(proto.billing_period_sec());
+ }
+ for (const auto& [k, v] : proto.labels()) {
+ Labels_[k] = v;
+ }
+ if (proto.has_metric_fields()) {
+ TString jsonStr;
+ if (auto st = google::protobuf::util::MessageToJsonString(proto.metric_fields(), &jsonStr); st.ok()) {
+ MetricFieldsJson_ = jsonStr;
+ }
+ }
+}
+
+void TMetric::SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric& proto) const {
+ proto.set_enabled(Enabled_);
+ if (BillingPeriod_) {
+ proto.set_billing_period_sec(BillingPeriod_->count());
+ }
+ for (const auto& [k, v] : Labels_) {
+ (*proto.mutable_labels())[k] = v;
+ }
+ if (!MetricFieldsJson_.empty()) {
+ google::protobuf::util::JsonStringToMessage(MetricFieldsJson_, proto.mutable_metric_fields());
+ }
+}
+
+TMeteringConfig::TMeteringConfig(const Ydb::RateLimiter::MeteringConfig& proto) {
+ Enabled_ = proto.enabled();
+ if (proto.report_period_ms()) {
+ ReportPeriod_ = std::chrono::milliseconds(proto.report_period_ms());
+ }
+ if (proto.meter_period_ms()) {
+ MeterPeriod_ = std::chrono::milliseconds(proto.meter_period_ms());
+ }
+ if (proto.collect_period_sec()) {
+ CollectPeriod_ = std::chrono::seconds(proto.collect_period_sec());
+ }
+ if (proto.provisioned_units_per_second()) {
+ ProvisionedUnitsPerSecond_ = proto.provisioned_units_per_second();
+ }
+ if (proto.provisioned_coefficient()) {
+ ProvisionedCoefficient_ = proto.provisioned_coefficient();
+ }
+ if (proto.overshoot_coefficient()) {
+ OvershootCoefficient_ = proto.overshoot_coefficient();
+ }
+ if (proto.has_provisioned()) {
+ Provisioned_.emplace(proto.provisioned());
+ }
+ if (proto.has_on_demand()) {
+ OnDemand_.emplace(proto.on_demand());
+ }
+ if (proto.has_overshoot()) {
+ Overshoot_.emplace(proto.overshoot());
+ }
+}
+
+void TMeteringConfig::SerializeTo(Ydb::RateLimiter::MeteringConfig& proto) const {
+ proto.set_enabled(Enabled_);
+ if (ReportPeriod_) {
+ proto.set_report_period_ms(ReportPeriod_->count());
+ }
+ if (MeterPeriod_) {
+ proto.set_meter_period_ms(MeterPeriod_->count());
+ }
+ if (CollectPeriod_) {
+ proto.set_collect_period_sec(CollectPeriod_->count());
+ }
+ if (ProvisionedUnitsPerSecond_) {
+ proto.set_provisioned_units_per_second(*ProvisionedUnitsPerSecond_);
+ }
+ if (ProvisionedCoefficient_) {
+ proto.set_provisioned_coefficient(*ProvisionedCoefficient_);
+ }
+ if (OvershootCoefficient_) {
+ proto.set_overshoot_coefficient(*OvershootCoefficient_);
+ }
+ if (Provisioned_) {
+ Provisioned_->SerializeTo(*proto.mutable_provisioned());
+ }
+ if (OnDemand_) {
+ OnDemand_->SerializeTo(*proto.mutable_on_demand());
+ }
+ if (Overshoot_) {
+ Overshoot_->SerializeTo(*proto.mutable_overshoot());
+ }
}
template struct THierarchicalDrrSettings<TCreateResourceSettings>;
@@ -67,6 +217,9 @@ TDescribeResourceResult::TDescribeResourceResult(TStatus status, const Ydb::Rate
, ResourcePath_(result.resource().resource_path())
, HierarchicalDrrProps_(result.resource().hierarchical_drr())
{
+ if (result.resource().has_metering_config()) {
+ MeteringConfig_ = result.resource().metering_config();
+ }
}
TDescribeResourceResult::THierarchicalDrrProps::THierarchicalDrrProps(const Ydb::RateLimiter::HierarchicalDrrSettings& settings)
@@ -102,6 +255,15 @@ public:
if (settings.PrefetchWatermark_) {
hdrr.set_prefetch_watermark(*settings.PrefetchWatermark_);
}
+ if (settings.ImmediatelyFillUpTo_) {
+ hdrr.set_immediately_fill_up_to(*settings.ImmediatelyFillUpTo_);
+ }
+ if (settings.LeafBehavior_) {
+ settings.LeafBehavior_->SerializeTo(hdrr);
+ }
+ if (settings.MeteringConfig_) {
+ settings.MeteringConfig_->SerializeTo(*resource.mutable_metering_config());
+ }
return request;
}