aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-10-23 09:55:47 +0300
committertesseract <tesseract@yandex-team.com>2023-10-23 10:28:15 +0300
commitcd4cf36a71bb8807d2daf17b944d923e95a2ade4 (patch)
tree7ffd45f270df116e7edb1a53b59e6ca6be848803
parent7b57cc21498630e97d4501d6adc7206944ac9af4 (diff)
downloadydb-cd4cf36a71bb8807d2daf17b944d923e95a2ade4.tar.gz
Enable metering for the size of the partition for important consumer
-rw-r--r--ydb/core/persqueue/metering_sink.cpp268
-rw-r--r--ydb/core/persqueue/metering_sink.h46
-rw-r--r--ydb/core/persqueue/partition.cpp113
-rw-r--r--ydb/core/persqueue/partition.h5
-rw-r--r--ydb/core/persqueue/partition_write.cpp11
-rw-r--r--ydb/core/persqueue/pq_impl.cpp7
-rw-r--r--ydb/core/persqueue/user_info.cpp5
-rw-r--r--ydb/core/persqueue/user_info.h1
-rw-r--r--ydb/core/persqueue/ut/metering_sink_ut.cpp35
-rw-r--r--ydb/core/persqueue/ut/resources/counters_topics.html4
-rw-r--r--ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp20
-rw-r--r--ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp14
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp4
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp2
15 files changed, 299 insertions, 240 deletions
diff --git a/ydb/core/persqueue/metering_sink.cpp b/ydb/core/persqueue/metering_sink.cpp
index 27f4f76f46..87854dc819 100644
--- a/ydb/core/persqueue/metering_sink.cpp
+++ b/ydb/core/persqueue/metering_sink.cpp
@@ -53,7 +53,6 @@ ui64 TMeteringSink::IncreaseQuantity(EMeteringJson meteringJson, ui64 inc) {
case EMeteringJson::UsedStorageV1:
CurrentUsedStorage_ += inc;
return CurrentUsedStorage_;
-
default:
return 0;
}
@@ -68,11 +67,10 @@ bool TMeteringSink::IsCreated() const {
return Created_;
}
-TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TString& schemeName,
- const THashMap<TString, ui64>& tags,
- const TString& quantityUnit, ui64 quantity,
- TInstant start, TInstant end, TInstant now, const TString& version) {
+TString TMeteringSink::GetMeteringJson(const TMeteringSink::FlushParameters& parameters, ui64 quantity,
+ TInstant start, TInstant end, TInstant now) {
MeteringCounter_.fetch_add(1);
+
TStringStream output;
NJson::TJsonWriter writer(&output, false);
@@ -81,22 +79,22 @@ TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TSt
writer.Write("cloud_id", Parameters_.YcCloudId);
writer.Write("folder_id", Parameters_.YcFolderId);
writer.Write("resource_id", Parameters_.ResourceId);
- writer.Write("id", TStringBuilder() << metricBillingId <<
+ writer.Write("id", TStringBuilder() << parameters.Name <<
"-" << Parameters_.YdbDatabaseId <<
"-" << Parameters_.TabletId <<
"-" << start.MilliSeconds() <<
- "-" << MeteringCounter_.load());
- writer.Write("schema", schemeName);
+ "-" << GetMeteringCounter());
+ writer.Write("schema", parameters.Schema);
writer.OpenMap("tags");
- for (const auto& [tag, value] : tags) {
+ for (const auto& [tag, value] : parameters.Tags) {
writer.Write(tag, value);
}
writer.CloseMap(); // "tags"
writer.OpenMap("usage");
writer.Write("quantity", quantity);
- writer.Write("unit", quantityUnit);
+ writer.Write("unit", parameters.Units);
writer.Write("start", start.Seconds());
writer.Write("finish", end.Seconds());
writer.CloseMap(); // "usage"
@@ -106,7 +104,7 @@ TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TSt
writer.Write("ydb_database", Parameters_.YdbDatabaseId);
writer.CloseMap(); // "labels"
- writer.Write("version", version);
+ writer.Write("version", parameters.Version);
writer.Write("source_id", Parameters_.TabletId);
writer.Write("source_wt", now.Seconds());
writer.CloseMap();
@@ -116,180 +114,128 @@ TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TSt
}
-void TMeteringSink::Flush(TInstant now, bool force) {
-
- for (auto whichOne : WhichToFlush_) {
- bool needFlush = force;
- TString units;
- TString schema;
- TString name;
-
- switch (whichOne) {
- case EMeteringJson::UsedStorageV1: {
- units = "byte*second";
- schema = "ydb.serverless.v1";
- name = "used_storage";
-
- needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]);
- if (!needFlush) {
- break;
- }
- ui64 duration = (now - LastFlush_[whichOne]).MilliSeconds();
- ui64 avgUsage = CurrentUsedStorage_ * 1_MB * 1000 / duration;
- CurrentUsedStorage_ = 0;
- const THashMap<TString, ui64> tags = {
- {"ydb_size", avgUsage}
- };
-
-
- auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
- while (interval < now) {
- const auto metricsJson = GetMeteringJson(
- name, schema, tags, "byte*second",
- (now - LastFlush_[whichOne]).Seconds(),
- LastFlush_[whichOne], interval, now, "1.0.0");
- LastFlush_[whichOne] = interval;
- FlushFunction_(metricsJson);
- interval += Parameters_.FlushLimit;
- }
- if (LastFlush_[whichOne] < now) {
- const auto metricsJson = GetMeteringJson(
- name, schema, tags, "byte*second",
- (now - LastFlush_[whichOne]).Seconds(),
- LastFlush_[whichOne], now, now, "1.0.0");
- LastFlush_[whichOne] = now;
- FlushFunction_(metricsJson);
- }
- }
- break;
-
-
+const TMeteringSink::FlushParameters TMeteringSink::GetFlushParameters(const EMeteringJson type, const TInstant& now, const TInstant& lastFlush) {
+ switch (type) {
+ case EMeteringJson::PutEventsV1: {
+ ui64 putUnits = CurrentPutUnitsQuantity_;
- case EMeteringJson::PutEventsV1: {
- units = "put_units";
- schema = "yds.events.puts.v1";
- name = "put_events";
+ CurrentPutUnitsQuantity_ = 0;
- auto& putUnits = CurrentPutUnitsQuantity_;
+ return TMeteringSink::FlushParameters(
+ "put_units",
+ "yds.events.puts.v1",
+ "put_events",
+ putUnits
+ ).withOneFlush();
+ }
- needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]);
- if (!needFlush) {
- break;
- }
- const auto isTimeToFlushUnits = now.Hours() > LastFlush_[whichOne].Hours();
- if (isTimeToFlushUnits || needFlush) {
- if (putUnits > 0) {
- // If we jump over a hour edge, report requests metrics for a previous hour
- const TInstant requestsEndTime = isTimeToFlushUnits
- ? TInstant::Hours(LastFlush_[whichOne].Hours() + 1) : now;
-
- const auto record = GetMeteringJson(
- units, schema, {}, name, putUnits,
- LastFlush_[whichOne], requestsEndTime, now);
- FlushFunction_(record);
- }
- putUnits = 0;
- LastFlush_[whichOne] = now;
- }
- }
- break;
- case EMeteringJson::ResourcesReservedV1: {
- needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]);
- if (!needFlush) {
- break;
- }
- const TString name = "reserved_resources";
- const TString schema = "yds.resources.reserved.v1";
- const THashMap<TString, ui64> tags = {
+ case EMeteringJson::ResourcesReservedV1: {
+ return TMeteringSink::FlushParameters(
+ "reserved_resources",
+ "yds.resources.reserved.v1",
+ "second",
+ Parameters_.PartitionsSize
+ ).withTags({
{"reserved_throughput_bps", Parameters_.WriteQuota},
{"reserved_consumers_count", Parameters_.ConsumersCount},
{"reserved_storage_bytes", Parameters_.ReservedSpace}
- };
- auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
- while (interval < now) {
- const auto metricsJson = GetMeteringJson(
- name, schema, tags, "second",
- Parameters_.PartitionsSize * (interval - LastFlush_[whichOne]).Seconds(),
- LastFlush_[whichOne], interval, now);
- LastFlush_[whichOne] = interval;
- FlushFunction_(metricsJson);
- interval += Parameters_.FlushLimit;
- }
- if (LastFlush_[whichOne] < now) {
- const auto metricsJson = GetMeteringJson(
- name, schema, tags, "second",
- Parameters_.PartitionsSize * (now - LastFlush_[whichOne]).Seconds(),
- LastFlush_[whichOne], now, now);
- LastFlush_[whichOne] = now;
- FlushFunction_(metricsJson);
- }
- }
- break;
+ });
+ }
- case EMeteringJson::ThroughputV1: {
- needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]);
- if (!needFlush) {
- break;
- }
- const TString name = "yds.reserved_resources";
- const TString schema = "yds.throughput.reserved.v1";
- const THashMap<TString, ui64> tags = {
+ case EMeteringJson::ThroughputV1: {
+ return TMeteringSink::FlushParameters(
+ "yds.reserved_resources",
+ "yds.throughput.reserved.v1",
+ "second",
+ Parameters_.PartitionsSize
+ ).withTags({
{"reserved_throughput_bps", Parameters_.WriteQuota},
{"reserved_consumers_count", Parameters_.ConsumersCount}
- };
- auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
+ });
+ }
- auto tryFlush = [&](TInstant start, TInstant finish) {
- const auto metricsJson = GetMeteringJson(
- name, schema, tags, "second",
- Parameters_.PartitionsSize * (finish.Seconds() - start.Seconds()),
- start, finish, now);
- FlushFunction_(metricsJson);
- LastFlush_[whichOne] = finish;
- };
+ case EMeteringJson::StorageV1: {
+ return TMeteringSink::FlushParameters(
+ "yds.reserved_resources",
+ "yds.storage.reserved.v1",
+ "mbyte*second",
+ Parameters_.PartitionsSize * (Parameters_.ReservedSpace / 1_MB)
+ );
+ }
- while (interval < now) {
- tryFlush(LastFlush_[whichOne], interval);
- interval += Parameters_.FlushLimit;
- }
- if (LastFlush_[whichOne] < now) {
- tryFlush(LastFlush_[whichOne], now);
- }
+ case EMeteringJson::UsedStorageV1: {
+ ui64 duration = (now - lastFlush).MilliSeconds();
+ ui64 avgUsage = CurrentUsedStorage_ * 1_MB * 1000 / duration;
+
+ CurrentUsedStorage_ = 0;
+
+ return TMeteringSink::FlushParameters(
+ "used_storage",
+ "ydb.serverless.v1",
+ "byte*second"
+ ).withTags({
+ {"ydb_size", avgUsage}
+ })
+ .withVersion("1.0.0");
+ }
+
+ default:
+ Y_ABORT_UNLESS(false);
+ };
+}
+
+
+void TMeteringSink::Flush(TInstant now, bool force) {
+
+ for (auto whichOne : WhichToFlush_) {
+ auto& lastFlush = LastFlush_[whichOne];
+ bool needFlush = force || IsTimeToFlush(now, lastFlush);
+ if (!needFlush) {
+ continue;
}
- break;
- case EMeteringJson::StorageV1: {
- needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]);
- if (!needFlush) {
- break;
+ auto parameters = GetFlushParameters(whichOne, now, lastFlush);
+
+ if (parameters.OneFlush) {
+ const auto isTimeToFlushUnits = now.Hours() > lastFlush.Hours();
+ if (parameters.Quantity > 0) {
+ // If we jump over a hour edge, report requests metrics for a previous hour
+ const TInstant requestsEndTime = isTimeToFlushUnits
+ ? TInstant::Hours(lastFlush.Hours() + 1) : now;
+
+ const auto record = GetMeteringJson(
+ parameters,
+ parameters.Quantity,
+ lastFlush,
+ requestsEndTime,
+ now);
+ FlushFunction_(record);
}
- const TString name = "yds.reserved_resources";
- const TString schema = "yds.storage.reserved.v1";
- auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
-
+
+ lastFlush = now;
+ } else {
+ auto interval = TInstant::Hours(lastFlush.Hours()) + Parameters_.FlushLimit;
+
auto tryFlush = [&](TInstant start, TInstant finish) {
const auto metricsJson = GetMeteringJson(
- name, schema, {}, "mbyte*second",
- Parameters_.PartitionsSize * (Parameters_.ReservedSpace / 1_MB) *
- (finish.Seconds() - start.Seconds()),
- start, finish, now);
+ parameters,
+ parameters.Quantity * (finish.Seconds() - start.Seconds()),
+ start,
+ finish,
+ now);
FlushFunction_(metricsJson);
- LastFlush_[whichOne] = finish;
+
+ lastFlush = finish;
};
while (interval < now) {
- tryFlush(LastFlush_[whichOne], interval);
+ tryFlush(lastFlush, interval);
interval += Parameters_.FlushLimit;
}
- if (LastFlush_[whichOne] < now) {
- tryFlush(LastFlush_[whichOne], now);
+ if (lastFlush < now) {
+ tryFlush(lastFlush, now);
}
}
- break;
-
- default:
- Y_ABORT_UNLESS(false);
- }
}
}
diff --git a/ydb/core/persqueue/metering_sink.h b/ydb/core/persqueue/metering_sink.h
index 30021281e7..d2f643a2c2 100644
--- a/ydb/core/persqueue/metering_sink.h
+++ b/ydb/core/persqueue/metering_sink.h
@@ -1,5 +1,8 @@
#pragma once
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/map.h>
#include <util/generic/set.h>
@@ -30,6 +33,7 @@ public:
ui64 ReservedSpace;
ui64 ConsumersCount;
};
+
bool Create(TInstant now, const TParameters& p, const TSet<EMeteringJson>& whichToFlush,
std::function<void(TString)> howToFlush);
void MayFlush(TInstant now);
@@ -40,10 +44,6 @@ public:
TParameters GetParameters() const;
bool IsCreated() const;
- TString GetMeteringJson(const TString& metricBillingId, const TString& schemeName,
- const THashMap<TString, ui64>& tags, const TString& quantityUnit, ui64 quantity,
- TInstant start, TInstant end, TInstant now, const TString& version = "v1");
-
ui64 GetMeteringCounter() const;
private:
@@ -60,6 +60,44 @@ private:
private:
void Flush(TInstant now, bool force);
+ struct FlushParameters {
+ FlushParameters(TString&& name,
+ TString&& schema,
+ TString&& units,
+ ui64 quantity = 1)
+ : Name(std::move(name))
+ , Schema(std::move(schema))
+ , Units(std::move(units))
+ , Quantity(quantity) {
+ }
+
+ FlushParameters& withTags(THashMap<TString, ui64>&& tags) {
+ Tags = std::move(tags);
+ return *this;
+ }
+
+ FlushParameters& withVersion(TString&& version) {
+ Version = std::move(version);
+ return *this;
+ }
+
+ FlushParameters& withOneFlush() {
+ OneFlush = true;
+ return *this;
+ }
+
+ TString Name;
+ TString Schema;
+ TString Units;
+ ui64 Quantity;
+ THashMap<TString, ui64> Tags;
+ bool OneFlush = false;
+ TString Version = "v1";
+ };
+
+ const FlushParameters GetFlushParameters(const EMeteringJson type, const TInstant& now, const TInstant& lastFlush);
+ TString GetMeteringJson(const TMeteringSink::FlushParameters& parameters, ui64 quantity, TInstant start, TInstant end, TInstant now);
+
bool IsTimeToFlush(TInstant now, TInstant last) const;
};
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 933b97eb61..ad31361e58 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -159,29 +159,27 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
}
void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
+ const auto now = ctx.Now();
Responses.emplace_back(
message.Body,
- WriteQuota->GetQuotedTime(ctx.Now()) - message.QuotedTime,
- (ctx.Now() - TInstant::Zero()) - message.QueueTime,
- ctx.Now()
+ WriteQuota->GetQuotedTime(now) - message.QuotedTime,
+ (now - TInstant::Zero()) - message.QueueTime,
+ now
);
}
-ui64 TPartition::MeteringDataSize(const TActorContext& ctx) const {
- ui64 size = Size();
- if (!DataKeysBody.empty()) {
- size -= DataKeysBody.front().Size;
- }
- auto expired = ctx.Now() - TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds());
- for(size_t i = 0; i < HeadKeys.size(); ++i) {
- auto& key = HeadKeys[i];
- if (expired < key.Timestamp) {
- break;
- }
- size -= key.Size;
+ui64 TPartition::MeteringDataSize(const TActorContext& /*ctx*/) const {
+ if (DataKeysBody.size() <= 1) {
+ // tiny optimization - we do not meter very small queues up to 16MB
+ return 0;
}
- Y_ABORT_UNLESS(size >= 0, "Metering data size must be positive");
- return size;
+
+ // We assume that DataKyesBody contains an up-to-date set of blobs, their relevance is
+ // maintained by the background process. However, the last block may contain several irrelevant
+ // messages. Because of them, we throw out the size of the entire blob.
+ ui64 size = Size() - DataKeysBody[0].Size;
+ Y_VERIFY_DEBUG(size >= 0, "Metering data size must be positive");
+ return std::max<ui64>(size, 0);
}
ui64 TPartition::ReserveSize() const {
@@ -200,10 +198,26 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
const auto now = ctx.Now();
const auto duration = now - LastUsedStorageMeterTimestamp;
LastUsedStorageMeterTimestamp = now;
- ui64 size = MeteringDataSize(ctx);
+
+ ui64 size = std::max<ui64>(MeteringDataSize(ctx) - ReserveSize(), 0);
return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds
}
+ui64 TPartition::ImportantClientsMinOffset() const {
+ const auto& partConfig = Config.GetPartitionConfig();
+
+ ui64 minOffset = EndOffset;
+ for (const auto& importantClientId : partConfig.GetImportantClientId()) {
+ const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId);
+ ui64 curOffset = StartOffset;
+ if (userInfo && userInfo->Offset >= 0) //-1 means no offset
+ curOffset = userInfo->Offset;
+ minOffset = Min<ui64>(minOffset, curOffset);
+ }
+
+ return minOffset;
+}
+
void TPartition::HandleWakeup(const TActorContext& ctx) {
FilterDeadlinedWrites(ctx);
@@ -304,15 +318,8 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
return false;
const auto& partConfig = Config.GetPartitionConfig();
- ui64 minOffset = EndOffset;
- for (const auto& importantClientId : partConfig.GetImportantClientId()) {
- TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId);
- ui64 curOffset = StartOffset;
- if (userInfo && userInfo->Offset >= 0) //-1 means no offset
- curOffset = userInfo->Offset;
- minOffset = Min<ui64>(minOffset, curOffset);
- }
+ ui64 minOffset = ImportantClientsMinOffset();
bool hasDrop = false;
ui64 endOffset = StartOffset;
@@ -320,36 +327,34 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
? std::optional<ui64>{partConfig.GetStorageLimitBytes()} : std::nullopt;
const TDuration lifetimeLimit{TDuration::Seconds(partConfig.GetLifetimeSeconds())};
- if (DataKeysBody.size() > 1) {
- auto retentionCondition = [&]() -> bool {
- const auto bodySize = BodySize - DataKeysBody.front().Size;
- const bool timeRetention = (ctx.Now() >= (DataKeysBody.front().Timestamp + lifetimeLimit));
- return storageLimit.has_value()
- ? ((bodySize >= *storageLimit) || timeRetention)
- : timeRetention;
- };
-
- while (DataKeysBody.size() > 1 &&
- retentionCondition() &&
- (minOffset > DataKeysBody[1].Key.GetOffset() ||
- (minOffset == DataKeysBody[1].Key.GetOffset() &&
- DataKeysBody[1].Key.GetPartNo() == 0))) { // all offsets from blob[0] are readed, and don't delete last blob
- BodySize -= DataKeysBody.front().Size;
-
- DataKeysBody.pop_front();
- if (!GapOffsets.empty() && DataKeysBody.front().Key.GetOffset() == GapOffsets.front().second) {
- GapSize -= GapOffsets.front().second - GapOffsets.front().first;
- GapOffsets.pop_front();
- }
- hasDrop = true;
- }
+ auto retentionCondition = [&]() -> bool {
+ const auto bodySize = BodySize - DataKeysBody.front().Size;
+ const bool timeRetention = (ctx.Now() >= (DataKeysBody.front().Timestamp + lifetimeLimit));
+ return storageLimit.has_value()
+ ? ((bodySize >= *storageLimit) || timeRetention)
+ : timeRetention;
+ };
- Y_ABORT_UNLESS(!DataKeysBody.empty());
+ while (DataKeysBody.size() > 1 &&
+ retentionCondition() &&
+ (minOffset > DataKeysBody[1].Key.GetOffset() ||
+ (minOffset == DataKeysBody[1].Key.GetOffset() &&
+ DataKeysBody[1].Key.GetPartNo() == 0))) { // all offsets from blob[0] are readed, and don't delete last blob
+ BodySize -= DataKeysBody.front().Size;
- endOffset = DataKeysBody.front().Key.GetOffset();
- if (DataKeysBody.front().Key.GetPartNo() > 0) {
- ++endOffset;
+ DataKeysBody.pop_front();
+ if (!GapOffsets.empty() && DataKeysBody.front().Key.GetOffset() == GapOffsets.front().second) {
+ GapSize -= GapOffsets.front().second - GapOffsets.front().first;
+ GapOffsets.pop_front();
}
+ hasDrop = true;
+ }
+
+ Y_ABORT_UNLESS(!DataKeysBody.empty());
+
+ endOffset = DataKeysBody.front().Key.GetOffset();
+ if (DataKeysBody.front().Key.GetPartNo() > 0) {
+ ++endOffset;
}
if (!hasDrop)
@@ -1213,7 +1218,7 @@ void TPartition::ReportCounters(const TActorContext& ctx, bool force) {
void TPartition::Handle(NReadQuoterEvents::TEvQuotaUpdated::TPtr& ev, const TActorContext&) {
for (auto& [consumerStr, quota] : ev->Get()->UpdatedConsumerQuotas) {
- TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumerStr);
+ const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumerStr);
if (userInfo) {
userInfo->LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Set(quota);
}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index b5ab6ecd9e..82ed105391 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -341,10 +341,15 @@ public:
return BodySize + Head.PackedSize;
}
+ // The size of the data realy was persisted in the storage by the partition
ui64 MeteringDataSize(const TActorContext& ctx) const;
+ // The size of the storage that was reserved by the partition
ui64 ReserveSize() const;
+ // The size of the storage that usud by the partition. That included combination of the reserver and realy persisted data.
ui64 StorageSize(const TActorContext& ctx) const;
ui64 UsedReserveSize(const TActorContext& ctx) const;
+ // Minimal offset, the data from which cannot be deleted, because it is required by an important consumer
+ ui64 ImportantClientsMinOffset() const;
//Bootstrap sends kvRead
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index f1f570ca30..7bac53df95 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1502,7 +1502,16 @@ bool TPartition::WaitingForPreviousBlobQuota() const {
}
bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 withSize) const {
- return SubDomainOutOfSpace && AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota() && MeteringDataSize(ctx) + withSize > ReserveSize();
+ if (!SubDomainOutOfSpace || !AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) {
+ return false;
+ }
+
+ if (NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS == Config.GetMeteringMode()) {
+ // We allow one message to be written even when the SubDomainOutOfSpace.
+ return withSize > 0 || Size() > 0;
+ }
+
+ return MeteringDataSize(ctx) + withSize > ReserveSize();
}
void TPartition::WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request) {
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 91d08d4286..8ef8502a66 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -922,7 +922,9 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
return;
}
- TSet<EMeteringJson> whichToFlush{EMeteringJson::PutEventsV1, EMeteringJson::ResourcesReservedV1};
+ TSet<EMeteringJson> whichToFlush{EMeteringJson::PutEventsV1,
+ EMeteringJson::ResourcesReservedV1,
+ EMeteringJson::UsedStorageV1};
ui64 storageLimitBytes{Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() *
Config.GetPartitionConfig().GetLifetimeSeconds()};
@@ -930,7 +932,8 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
storageLimitBytes = Config.GetPartitionConfig().GetStorageLimitBytes();
whichToFlush = TSet<EMeteringJson>{EMeteringJson::PutEventsV1,
EMeteringJson::ThroughputV1,
- EMeteringJson::StorageV1};
+ EMeteringJson::StorageV1,
+ EMeteringJson::UsedStorageV1};
}
switch (Config.GetMeteringMode()) {
diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp
index 207be67085..131f9057fb 100644
--- a/ydb/core/persqueue/user_info.cpp
+++ b/ydb/core/persqueue/user_info.cpp
@@ -152,6 +152,11 @@ TUserInfo& TUsersInfoStorage::GetOrCreate(const TString& user, const TActorConte
return it->second;
}
+const TUserInfo* TUsersInfoStorage::GetIfExists(const TString& user) const {
+ auto it = UsersInfo.find(user);
+ return it != UsersInfo.end() ? &it->second : nullptr;
+}
+
TUserInfo* TUsersInfoStorage::GetIfExists(const TString& user) {
auto it = UsersInfo.find(user);
return it != UsersInfo.end() ? &it->second : nullptr;
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 4d485f3c7e..3d6319accd 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -373,6 +373,7 @@ public:
void Parse(const TString& key, const TString& data, const TActorContext& ctx);
TUserInfo& GetOrCreate(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration = {});
+ const TUserInfo* GetIfExists(const TString& user) const;
TUserInfo* GetIfExists(const TString& user);
void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) {
diff --git a/ydb/core/persqueue/ut/metering_sink_ut.cpp b/ydb/core/persqueue/ut/metering_sink_ut.cpp
index 82664aaff6..ee40fab7fe 100644
--- a/ydb/core/persqueue/ut/metering_sink_ut.cpp
+++ b/ydb/core/persqueue/ut/metering_sink_ut.cpp
@@ -167,6 +167,41 @@ Y_UNIT_TEST(FlushStorageV1) {
UNIT_ASSERT_VALUES_EQUAL(fullMetering, referenceStorageJson);
}
+Y_UNIT_TEST(UsedStorageV1) {
+ TString fullMetering;
+ const ui64 creationTs = 1651752943168786;
+ const ui64 flushTs = 1651754943168786;
+ const ui32 partitions = 7;
+ const ui64 reservedSpace = 42_GB;
+
+ TMeteringSink meteringSink;
+ meteringSink.Create(TInstant::FromValue(creationTs), {
+ .FlushInterval = TDuration::Seconds(10),
+ .TabletId = "tabletId",
+ .YcCloudId = "cloudId",
+ .YcFolderId = "folderId",
+ .YdbDatabaseId = "databaseId",
+ .StreamName = "streamName",
+ .ResourceId = "streamPath",
+ .PartitionsSize = partitions,
+ .ReservedSpace = reservedSpace,
+ }, {EMeteringJson::UsedStorageV1}, [&](TString json) {
+ fullMetering = TStringBuilder() << fullMetering << '\n' << json;
+ });
+
+ const ui32 quantity = 13;
+
+ meteringSink.IncreaseQuantity(EMeteringJson::UsedStorageV1, quantity);
+ meteringSink.MayFlushForcibly(TInstant::FromValue(flushTs));
+
+ const TString referenceStorageJson = TStringBuilder() <<
+ "\n{\"cloud_id\":\"cloudId\",\"folder_id\":\"folderId\",\"resource_id\":\"streamPath\","
+ << "\"id\":\"used_storage-databaseId-tabletId-1651752943168-" << meteringSink.GetMeteringCounter() << "\","
+ << "\"schema\":\"ydb.serverless.v1\",\"tags\":{\"ydb_size\":6815},\"usage\":{\"quantity\":2000,\"unit\":\"byte*second\",\"start\":1651752943,\"finish\":1651754943},"
+ << "\"labels\":{\"datastreams_stream_name\":\"streamName\",\"ydb_database\":\"databaseId\"},\"version\":\"1.0.0\",\"source_id\":\"tabletId\",\"source_wt\":1651754943}\n";
+ UNIT_ASSERT_VALUES_EQUAL(fullMetering, referenceStorageJson);
+}
+
} // Y_UNIT_TEST_SUITE(MeteringSink)
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/ut/resources/counters_topics.html b/ydb/core/persqueue/ut/resources/counters_topics.html
index 4e7686f4f2..2084d1e219 100644
--- a/ydb/core/persqueue/ut/resources/counters_topics.html
+++ b/ydb/core/persqueue/ut/resources/counters_topics.html
@@ -16,7 +16,7 @@ host=:
name=topic.partition.read.inflight_throttled_microseconds_max: 0
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
name=topic.partition.read.throttled_microseconds_max: 0
- name=topic.partition.storage_bytes_max: 747
+ name=topic.partition.storage_bytes_max: 0
name=topic.partition.total_count: 2
name=topic.partition.uptime_milliseconds_min: 30000
name=topic.partition.write.bytes_per_day_max: 540
@@ -29,7 +29,7 @@ host=:
name=topic.producers_count: 3
name=topic.reserve.limit_bytes: 0
name=topic.reserve.used_bytes: 0
- name=topic.storage_bytes: 747
+ name=topic.storage_bytes: 0
consumer=client:
name=topic.partition.alive_count: 1
diff --git a/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp
index b3444b45cf..12dce31e05 100644
--- a/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp
+++ b/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp
@@ -436,21 +436,23 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
PartitionPerTablet: 3
PQTabletConfig {
PartitionConfig {
- LifetimeSeconds: 11
+ LifetimeSeconds: 2678400
WriteSpeedInBytesPerSecond : 17
}
MeteringMode: METERING_MODE_RESERVED_CAPACITY
}
)");
env.TestWaitNotification(runtime, txId);
- Assert(3 * 11 * 17, 0); // 561, 0
+ Assert(3 * 2678400 * 17, 0); // 136598400, 0
+
+ auto msg = TString(24_MB, '_');
ui32 msgSeqNo = 100;
- WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100");
+ WriteToTopic(runtime, topicPath, msgSeqNo, msg);
env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats
- Assert(3 * 11 * 17, 69); // 69 - it is unstable value. it can change if internal message store change
+ Assert(3 * 2678400 * 17, 16975296); // 16975296 - it is unstable value. it can change if internal message store change
}
Y_UNIT_TEST(TopicPeriodicStatMeteringModeRequest) {
@@ -504,15 +506,17 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
UNIT_ASSERT_EQUAL_C(0, stats->Record.GetDataSize(), "DataSize from ReadBalancer");
UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer");
+ auto msg = TString(24_MB, '_');
+
ui32 msgSeqNo = 100;
- WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100");
+ WriteToTopic(runtime, topicPath, ++msgSeqNo, msg);
env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats
- Assert(69, 0); // 69 - it is unstable value. it can change if internal message store change
+ Assert(16975296, 0); // 69 - it is unstable value. it can change if internal message store change
stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
- UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer");
+ UNIT_ASSERT_EQUAL_C(16975296, stats->Record.GetDataSize(), "DataSize from ReadBalancer");
UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer");
appData.PQConfig.SetBalancerWakeupIntervalSec(30);
@@ -520,7 +524,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
GracefulRestartTablet(runtime, balancerId, sender);
stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
- UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer after reload");
+ UNIT_ASSERT_EQUAL_C(16975296, stats->Record.GetDataSize(), "DataSize from ReadBalancer after reload");
UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer after reload");
}
diff --git a/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp b/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp
index 2fd2050a03..af8d11da64 100644
--- a/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp
+++ b/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp
@@ -3178,8 +3178,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
UNIT_ASSERT_EQUAL_C(false, stats->Record.GetSubDomainOutOfSpace(), "SubDomainOutOfSpace from ReadBalancer");
+ auto msg = TString(24_MB, '_');
+
ui32 seqNo = 100;
- WriteToTopic(runtime, "/MyRoot/USER_1/Topic1", ++seqNo, "Message 0");
+ WriteToTopic(runtime, "/MyRoot/USER_1/Topic1", ++seqNo, msg);
env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats
TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_1"),
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
index e3aaee33e4..6d262b6731 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
@@ -274,12 +274,17 @@ namespace NYdb::NTopic::NTests {
DescribeConsumer(setup, client, true, false, false, false);
DescribePartition(setup, client, true, false, false, false);
+ const size_t messagesCount = 1;
+
// Write a message
{
- auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID);
+ auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID).Codec(ECodec::RAW);
auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
- std::string message(10_KB, 'x');
- UNIT_ASSERT(writeSession->Write(message));
+ std::string message(32_MB, 'x');
+
+ for(size_t i = 0; i < messagesCount; ++i) {
+ UNIT_ASSERT(writeSession->Write(message));
+ }
writeSession->Close();
}
@@ -313,9 +318,10 @@ namespace NYdb::NTopic::NTests {
TMaybe<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
UNIT_ASSERT(event);
auto commitOffsetAck = std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(event.Get());
+
UNIT_ASSERT_C(commitOffsetAck, DebugString(*event));
- UNIT_ASSERT_VALUES_EQUAL(commitOffsetAck->GetCommittedOffset(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(commitOffsetAck->GetCommittedOffset(), messagesCount);
}
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index 1b710ff7f8..6e4e25d23e 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -269,7 +269,7 @@ namespace NKikimr::NGRpcProxy::V1 {
if (hasPassword) {
return TMsgPqCodes("incorrect client service type password", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
- if (AppData(ctx)->PQConfig.GetForceClientServiceTypePasswordCheck()) { // no password and check is required
+ if (pqConfig.GetForceClientServiceTypePasswordCheck()) { // no password and check is required
return TMsgPqCodes("no client service type password provided", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
}
@@ -292,7 +292,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
if (rr.important()) {
- if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ if (pqConfig.GetTopicsAreFirstClassCitizen() && !AppData(ctx)->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) {
return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 816779ecba..534f58706d 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -4805,7 +4805,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << response.DebugString() << "\n" << res.DebugString() << "\n";
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(res.topic_stats().store_size_bytes(), 800);
+ UNIT_ASSERT_VALUES_EQUAL(res.topic_stats().store_size_bytes(), 0);
UNIT_ASSERT_GE(res.partitions(0).partition_stats().partition_offsets().end(), 1);
}