diff options
author | tesseract <tesseract@yandex-team.com> | 2023-10-23 09:55:47 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-10-23 10:28:15 +0300 |
commit | cd4cf36a71bb8807d2daf17b944d923e95a2ade4 (patch) | |
tree | 7ffd45f270df116e7edb1a53b59e6ca6be848803 | |
parent | 7b57cc21498630e97d4501d6adc7206944ac9af4 (diff) | |
download | ydb-cd4cf36a71bb8807d2daf17b944d923e95a2ade4.tar.gz |
Enable metering for the size of the partition for important consumer
-rw-r--r-- | ydb/core/persqueue/metering_sink.cpp | 268 | ||||
-rw-r--r-- | ydb/core/persqueue/metering_sink.h | 46 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 113 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 11 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 7 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.cpp | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/metering_sink_ut.cpp | 35 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/resources/counters_topics.html | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp | 4 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp | 14 | ||||
-rw-r--r-- | ydb/services/lib/actors/pq_schema_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 2 |
15 files changed, 299 insertions, 240 deletions
diff --git a/ydb/core/persqueue/metering_sink.cpp b/ydb/core/persqueue/metering_sink.cpp index 27f4f76f46..87854dc819 100644 --- a/ydb/core/persqueue/metering_sink.cpp +++ b/ydb/core/persqueue/metering_sink.cpp @@ -53,7 +53,6 @@ ui64 TMeteringSink::IncreaseQuantity(EMeteringJson meteringJson, ui64 inc) { case EMeteringJson::UsedStorageV1: CurrentUsedStorage_ += inc; return CurrentUsedStorage_; - default: return 0; } @@ -68,11 +67,10 @@ bool TMeteringSink::IsCreated() const { return Created_; } -TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TString& schemeName, - const THashMap<TString, ui64>& tags, - const TString& quantityUnit, ui64 quantity, - TInstant start, TInstant end, TInstant now, const TString& version) { +TString TMeteringSink::GetMeteringJson(const TMeteringSink::FlushParameters& parameters, ui64 quantity, + TInstant start, TInstant end, TInstant now) { MeteringCounter_.fetch_add(1); + TStringStream output; NJson::TJsonWriter writer(&output, false); @@ -81,22 +79,22 @@ TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TSt writer.Write("cloud_id", Parameters_.YcCloudId); writer.Write("folder_id", Parameters_.YcFolderId); writer.Write("resource_id", Parameters_.ResourceId); - writer.Write("id", TStringBuilder() << metricBillingId << + writer.Write("id", TStringBuilder() << parameters.Name << "-" << Parameters_.YdbDatabaseId << "-" << Parameters_.TabletId << "-" << start.MilliSeconds() << - "-" << MeteringCounter_.load()); - writer.Write("schema", schemeName); + "-" << GetMeteringCounter()); + writer.Write("schema", parameters.Schema); writer.OpenMap("tags"); - for (const auto& [tag, value] : tags) { + for (const auto& [tag, value] : parameters.Tags) { writer.Write(tag, value); } writer.CloseMap(); // "tags" writer.OpenMap("usage"); writer.Write("quantity", quantity); - writer.Write("unit", quantityUnit); + writer.Write("unit", parameters.Units); writer.Write("start", start.Seconds()); writer.Write("finish", end.Seconds()); writer.CloseMap(); // "usage" @@ -106,7 +104,7 @@ TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TSt writer.Write("ydb_database", Parameters_.YdbDatabaseId); writer.CloseMap(); // "labels" - writer.Write("version", version); + writer.Write("version", parameters.Version); writer.Write("source_id", Parameters_.TabletId); writer.Write("source_wt", now.Seconds()); writer.CloseMap(); @@ -116,180 +114,128 @@ TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TSt } -void TMeteringSink::Flush(TInstant now, bool force) { - - for (auto whichOne : WhichToFlush_) { - bool needFlush = force; - TString units; - TString schema; - TString name; - - switch (whichOne) { - case EMeteringJson::UsedStorageV1: { - units = "byte*second"; - schema = "ydb.serverless.v1"; - name = "used_storage"; - - needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]); - if (!needFlush) { - break; - } - ui64 duration = (now - LastFlush_[whichOne]).MilliSeconds(); - ui64 avgUsage = CurrentUsedStorage_ * 1_MB * 1000 / duration; - CurrentUsedStorage_ = 0; - const THashMap<TString, ui64> tags = { - {"ydb_size", avgUsage} - }; - - - auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; - while (interval < now) { - const auto metricsJson = GetMeteringJson( - name, schema, tags, "byte*second", - (now - LastFlush_[whichOne]).Seconds(), - LastFlush_[whichOne], interval, now, "1.0.0"); - LastFlush_[whichOne] = interval; - FlushFunction_(metricsJson); - interval += Parameters_.FlushLimit; - } - if (LastFlush_[whichOne] < now) { - const auto metricsJson = GetMeteringJson( - name, schema, tags, "byte*second", - (now - LastFlush_[whichOne]).Seconds(), - LastFlush_[whichOne], now, now, "1.0.0"); - LastFlush_[whichOne] = now; - FlushFunction_(metricsJson); - } - } - break; - - +const TMeteringSink::FlushParameters TMeteringSink::GetFlushParameters(const EMeteringJson type, const TInstant& now, const TInstant& lastFlush) { + switch (type) { + case EMeteringJson::PutEventsV1: { + ui64 putUnits = CurrentPutUnitsQuantity_; - case EMeteringJson::PutEventsV1: { - units = "put_units"; - schema = "yds.events.puts.v1"; - name = "put_events"; + CurrentPutUnitsQuantity_ = 0; - auto& putUnits = CurrentPutUnitsQuantity_; + return TMeteringSink::FlushParameters( + "put_units", + "yds.events.puts.v1", + "put_events", + putUnits + ).withOneFlush(); + } - needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]); - if (!needFlush) { - break; - } - const auto isTimeToFlushUnits = now.Hours() > LastFlush_[whichOne].Hours(); - if (isTimeToFlushUnits || needFlush) { - if (putUnits > 0) { - // If we jump over a hour edge, report requests metrics for a previous hour - const TInstant requestsEndTime = isTimeToFlushUnits - ? TInstant::Hours(LastFlush_[whichOne].Hours() + 1) : now; - - const auto record = GetMeteringJson( - units, schema, {}, name, putUnits, - LastFlush_[whichOne], requestsEndTime, now); - FlushFunction_(record); - } - putUnits = 0; - LastFlush_[whichOne] = now; - } - } - break; - case EMeteringJson::ResourcesReservedV1: { - needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]); - if (!needFlush) { - break; - } - const TString name = "reserved_resources"; - const TString schema = "yds.resources.reserved.v1"; - const THashMap<TString, ui64> tags = { + case EMeteringJson::ResourcesReservedV1: { + return TMeteringSink::FlushParameters( + "reserved_resources", + "yds.resources.reserved.v1", + "second", + Parameters_.PartitionsSize + ).withTags({ {"reserved_throughput_bps", Parameters_.WriteQuota}, {"reserved_consumers_count", Parameters_.ConsumersCount}, {"reserved_storage_bytes", Parameters_.ReservedSpace} - }; - auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; - while (interval < now) { - const auto metricsJson = GetMeteringJson( - name, schema, tags, "second", - Parameters_.PartitionsSize * (interval - LastFlush_[whichOne]).Seconds(), - LastFlush_[whichOne], interval, now); - LastFlush_[whichOne] = interval; - FlushFunction_(metricsJson); - interval += Parameters_.FlushLimit; - } - if (LastFlush_[whichOne] < now) { - const auto metricsJson = GetMeteringJson( - name, schema, tags, "second", - Parameters_.PartitionsSize * (now - LastFlush_[whichOne]).Seconds(), - LastFlush_[whichOne], now, now); - LastFlush_[whichOne] = now; - FlushFunction_(metricsJson); - } - } - break; + }); + } - case EMeteringJson::ThroughputV1: { - needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]); - if (!needFlush) { - break; - } - const TString name = "yds.reserved_resources"; - const TString schema = "yds.throughput.reserved.v1"; - const THashMap<TString, ui64> tags = { + case EMeteringJson::ThroughputV1: { + return TMeteringSink::FlushParameters( + "yds.reserved_resources", + "yds.throughput.reserved.v1", + "second", + Parameters_.PartitionsSize + ).withTags({ {"reserved_throughput_bps", Parameters_.WriteQuota}, {"reserved_consumers_count", Parameters_.ConsumersCount} - }; - auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; + }); + } - auto tryFlush = [&](TInstant start, TInstant finish) { - const auto metricsJson = GetMeteringJson( - name, schema, tags, "second", - Parameters_.PartitionsSize * (finish.Seconds() - start.Seconds()), - start, finish, now); - FlushFunction_(metricsJson); - LastFlush_[whichOne] = finish; - }; + case EMeteringJson::StorageV1: { + return TMeteringSink::FlushParameters( + "yds.reserved_resources", + "yds.storage.reserved.v1", + "mbyte*second", + Parameters_.PartitionsSize * (Parameters_.ReservedSpace / 1_MB) + ); + } - while (interval < now) { - tryFlush(LastFlush_[whichOne], interval); - interval += Parameters_.FlushLimit; - } - if (LastFlush_[whichOne] < now) { - tryFlush(LastFlush_[whichOne], now); - } + case EMeteringJson::UsedStorageV1: { + ui64 duration = (now - lastFlush).MilliSeconds(); + ui64 avgUsage = CurrentUsedStorage_ * 1_MB * 1000 / duration; + + CurrentUsedStorage_ = 0; + + return TMeteringSink::FlushParameters( + "used_storage", + "ydb.serverless.v1", + "byte*second" + ).withTags({ + {"ydb_size", avgUsage} + }) + .withVersion("1.0.0"); + } + + default: + Y_ABORT_UNLESS(false); + }; +} + + +void TMeteringSink::Flush(TInstant now, bool force) { + + for (auto whichOne : WhichToFlush_) { + auto& lastFlush = LastFlush_[whichOne]; + bool needFlush = force || IsTimeToFlush(now, lastFlush); + if (!needFlush) { + continue; } - break; - case EMeteringJson::StorageV1: { - needFlush |= IsTimeToFlush(now, LastFlush_[whichOne]); - if (!needFlush) { - break; + auto parameters = GetFlushParameters(whichOne, now, lastFlush); + + if (parameters.OneFlush) { + const auto isTimeToFlushUnits = now.Hours() > lastFlush.Hours(); + if (parameters.Quantity > 0) { + // If we jump over a hour edge, report requests metrics for a previous hour + const TInstant requestsEndTime = isTimeToFlushUnits + ? TInstant::Hours(lastFlush.Hours() + 1) : now; + + const auto record = GetMeteringJson( + parameters, + parameters.Quantity, + lastFlush, + requestsEndTime, + now); + FlushFunction_(record); } - const TString name = "yds.reserved_resources"; - const TString schema = "yds.storage.reserved.v1"; - auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; - + + lastFlush = now; + } else { + auto interval = TInstant::Hours(lastFlush.Hours()) + Parameters_.FlushLimit; + auto tryFlush = [&](TInstant start, TInstant finish) { const auto metricsJson = GetMeteringJson( - name, schema, {}, "mbyte*second", - Parameters_.PartitionsSize * (Parameters_.ReservedSpace / 1_MB) * - (finish.Seconds() - start.Seconds()), - start, finish, now); + parameters, + parameters.Quantity * (finish.Seconds() - start.Seconds()), + start, + finish, + now); FlushFunction_(metricsJson); - LastFlush_[whichOne] = finish; + + lastFlush = finish; }; while (interval < now) { - tryFlush(LastFlush_[whichOne], interval); + tryFlush(lastFlush, interval); interval += Parameters_.FlushLimit; } - if (LastFlush_[whichOne] < now) { - tryFlush(LastFlush_[whichOne], now); + if (lastFlush < now) { + tryFlush(lastFlush, now); } } - break; - - default: - Y_ABORT_UNLESS(false); - } } } diff --git a/ydb/core/persqueue/metering_sink.h b/ydb/core/persqueue/metering_sink.h index 30021281e7..d2f643a2c2 100644 --- a/ydb/core/persqueue/metering_sink.h +++ b/ydb/core/persqueue/metering_sink.h @@ -1,5 +1,8 @@ #pragma once +#include <util/datetime/base.h> +#include <util/generic/hash.h> +#include <util/generic/map.h> #include <util/generic/set.h> @@ -30,6 +33,7 @@ public: ui64 ReservedSpace; ui64 ConsumersCount; }; + bool Create(TInstant now, const TParameters& p, const TSet<EMeteringJson>& whichToFlush, std::function<void(TString)> howToFlush); void MayFlush(TInstant now); @@ -40,10 +44,6 @@ public: TParameters GetParameters() const; bool IsCreated() const; - TString GetMeteringJson(const TString& metricBillingId, const TString& schemeName, - const THashMap<TString, ui64>& tags, const TString& quantityUnit, ui64 quantity, - TInstant start, TInstant end, TInstant now, const TString& version = "v1"); - ui64 GetMeteringCounter() const; private: @@ -60,6 +60,44 @@ private: private: void Flush(TInstant now, bool force); + struct FlushParameters { + FlushParameters(TString&& name, + TString&& schema, + TString&& units, + ui64 quantity = 1) + : Name(std::move(name)) + , Schema(std::move(schema)) + , Units(std::move(units)) + , Quantity(quantity) { + } + + FlushParameters& withTags(THashMap<TString, ui64>&& tags) { + Tags = std::move(tags); + return *this; + } + + FlushParameters& withVersion(TString&& version) { + Version = std::move(version); + return *this; + } + + FlushParameters& withOneFlush() { + OneFlush = true; + return *this; + } + + TString Name; + TString Schema; + TString Units; + ui64 Quantity; + THashMap<TString, ui64> Tags; + bool OneFlush = false; + TString Version = "v1"; + }; + + const FlushParameters GetFlushParameters(const EMeteringJson type, const TInstant& now, const TInstant& lastFlush); + TString GetMeteringJson(const TMeteringSink::FlushParameters& parameters, ui64 quantity, TInstant start, TInstant end, TInstant now); + bool IsTimeToFlush(TInstant now, TInstant last) const; }; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 933b97eb61..ad31361e58 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -159,29 +159,27 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co } void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) { + const auto now = ctx.Now(); Responses.emplace_back( message.Body, - WriteQuota->GetQuotedTime(ctx.Now()) - message.QuotedTime, - (ctx.Now() - TInstant::Zero()) - message.QueueTime, - ctx.Now() + WriteQuota->GetQuotedTime(now) - message.QuotedTime, + (now - TInstant::Zero()) - message.QueueTime, + now ); } -ui64 TPartition::MeteringDataSize(const TActorContext& ctx) const { - ui64 size = Size(); - if (!DataKeysBody.empty()) { - size -= DataKeysBody.front().Size; - } - auto expired = ctx.Now() - TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()); - for(size_t i = 0; i < HeadKeys.size(); ++i) { - auto& key = HeadKeys[i]; - if (expired < key.Timestamp) { - break; - } - size -= key.Size; +ui64 TPartition::MeteringDataSize(const TActorContext& /*ctx*/) const { + if (DataKeysBody.size() <= 1) { + // tiny optimization - we do not meter very small queues up to 16MB + return 0; } - Y_ABORT_UNLESS(size >= 0, "Metering data size must be positive"); - return size; + + // We assume that DataKyesBody contains an up-to-date set of blobs, their relevance is + // maintained by the background process. However, the last block may contain several irrelevant + // messages. Because of them, we throw out the size of the entire blob. + ui64 size = Size() - DataKeysBody[0].Size; + Y_VERIFY_DEBUG(size >= 0, "Metering data size must be positive"); + return std::max<ui64>(size, 0); } ui64 TPartition::ReserveSize() const { @@ -200,10 +198,26 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) { const auto now = ctx.Now(); const auto duration = now - LastUsedStorageMeterTimestamp; LastUsedStorageMeterTimestamp = now; - ui64 size = MeteringDataSize(ctx); + + ui64 size = std::max<ui64>(MeteringDataSize(ctx) - ReserveSize(), 0); return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds } +ui64 TPartition::ImportantClientsMinOffset() const { + const auto& partConfig = Config.GetPartitionConfig(); + + ui64 minOffset = EndOffset; + for (const auto& importantClientId : partConfig.GetImportantClientId()) { + const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId); + ui64 curOffset = StartOffset; + if (userInfo && userInfo->Offset >= 0) //-1 means no offset + curOffset = userInfo->Offset; + minOffset = Min<ui64>(minOffset, curOffset); + } + + return minOffset; +} + void TPartition::HandleWakeup(const TActorContext& ctx) { FilterDeadlinedWrites(ctx); @@ -304,15 +318,8 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont return false; const auto& partConfig = Config.GetPartitionConfig(); - ui64 minOffset = EndOffset; - for (const auto& importantClientId : partConfig.GetImportantClientId()) { - TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId); - ui64 curOffset = StartOffset; - if (userInfo && userInfo->Offset >= 0) //-1 means no offset - curOffset = userInfo->Offset; - minOffset = Min<ui64>(minOffset, curOffset); - } + ui64 minOffset = ImportantClientsMinOffset(); bool hasDrop = false; ui64 endOffset = StartOffset; @@ -320,36 +327,34 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont ? std::optional<ui64>{partConfig.GetStorageLimitBytes()} : std::nullopt; const TDuration lifetimeLimit{TDuration::Seconds(partConfig.GetLifetimeSeconds())}; - if (DataKeysBody.size() > 1) { - auto retentionCondition = [&]() -> bool { - const auto bodySize = BodySize - DataKeysBody.front().Size; - const bool timeRetention = (ctx.Now() >= (DataKeysBody.front().Timestamp + lifetimeLimit)); - return storageLimit.has_value() - ? ((bodySize >= *storageLimit) || timeRetention) - : timeRetention; - }; - - while (DataKeysBody.size() > 1 && - retentionCondition() && - (minOffset > DataKeysBody[1].Key.GetOffset() || - (minOffset == DataKeysBody[1].Key.GetOffset() && - DataKeysBody[1].Key.GetPartNo() == 0))) { // all offsets from blob[0] are readed, and don't delete last blob - BodySize -= DataKeysBody.front().Size; - - DataKeysBody.pop_front(); - if (!GapOffsets.empty() && DataKeysBody.front().Key.GetOffset() == GapOffsets.front().second) { - GapSize -= GapOffsets.front().second - GapOffsets.front().first; - GapOffsets.pop_front(); - } - hasDrop = true; - } + auto retentionCondition = [&]() -> bool { + const auto bodySize = BodySize - DataKeysBody.front().Size; + const bool timeRetention = (ctx.Now() >= (DataKeysBody.front().Timestamp + lifetimeLimit)); + return storageLimit.has_value() + ? ((bodySize >= *storageLimit) || timeRetention) + : timeRetention; + }; - Y_ABORT_UNLESS(!DataKeysBody.empty()); + while (DataKeysBody.size() > 1 && + retentionCondition() && + (minOffset > DataKeysBody[1].Key.GetOffset() || + (minOffset == DataKeysBody[1].Key.GetOffset() && + DataKeysBody[1].Key.GetPartNo() == 0))) { // all offsets from blob[0] are readed, and don't delete last blob + BodySize -= DataKeysBody.front().Size; - endOffset = DataKeysBody.front().Key.GetOffset(); - if (DataKeysBody.front().Key.GetPartNo() > 0) { - ++endOffset; + DataKeysBody.pop_front(); + if (!GapOffsets.empty() && DataKeysBody.front().Key.GetOffset() == GapOffsets.front().second) { + GapSize -= GapOffsets.front().second - GapOffsets.front().first; + GapOffsets.pop_front(); } + hasDrop = true; + } + + Y_ABORT_UNLESS(!DataKeysBody.empty()); + + endOffset = DataKeysBody.front().Key.GetOffset(); + if (DataKeysBody.front().Key.GetPartNo() > 0) { + ++endOffset; } if (!hasDrop) @@ -1213,7 +1218,7 @@ void TPartition::ReportCounters(const TActorContext& ctx, bool force) { void TPartition::Handle(NReadQuoterEvents::TEvQuotaUpdated::TPtr& ev, const TActorContext&) { for (auto& [consumerStr, quota] : ev->Get()->UpdatedConsumerQuotas) { - TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumerStr); + const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumerStr); if (userInfo) { userInfo->LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Set(quota); } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index b5ab6ecd9e..82ed105391 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -341,10 +341,15 @@ public: return BodySize + Head.PackedSize; } + // The size of the data realy was persisted in the storage by the partition ui64 MeteringDataSize(const TActorContext& ctx) const; + // The size of the storage that was reserved by the partition ui64 ReserveSize() const; + // The size of the storage that usud by the partition. That included combination of the reserver and realy persisted data. ui64 StorageSize(const TActorContext& ctx) const; ui64 UsedReserveSize(const TActorContext& ctx) const; + // Minimal offset, the data from which cannot be deleted, because it is required by an important consumer + ui64 ImportantClientsMinOffset() const; //Bootstrap sends kvRead diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index f1f570ca30..7bac53df95 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1502,7 +1502,16 @@ bool TPartition::WaitingForPreviousBlobQuota() const { } bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 withSize) const { - return SubDomainOutOfSpace && AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota() && MeteringDataSize(ctx) + withSize > ReserveSize(); + if (!SubDomainOutOfSpace || !AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) { + return false; + } + + if (NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS == Config.GetMeteringMode()) { + // We allow one message to be written even when the SubDomainOutOfSpace. + return withSize > 0 || Size() > 0; + } + + return MeteringDataSize(ctx) + withSize > ReserveSize(); } void TPartition::WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request) { diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 91d08d4286..8ef8502a66 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -922,7 +922,9 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) { return; } - TSet<EMeteringJson> whichToFlush{EMeteringJson::PutEventsV1, EMeteringJson::ResourcesReservedV1}; + TSet<EMeteringJson> whichToFlush{EMeteringJson::PutEventsV1, + EMeteringJson::ResourcesReservedV1, + EMeteringJson::UsedStorageV1}; ui64 storageLimitBytes{Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * Config.GetPartitionConfig().GetLifetimeSeconds()}; @@ -930,7 +932,8 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) { storageLimitBytes = Config.GetPartitionConfig().GetStorageLimitBytes(); whichToFlush = TSet<EMeteringJson>{EMeteringJson::PutEventsV1, EMeteringJson::ThroughputV1, - EMeteringJson::StorageV1}; + EMeteringJson::StorageV1, + EMeteringJson::UsedStorageV1}; } switch (Config.GetMeteringMode()) { diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp index 207be67085..131f9057fb 100644 --- a/ydb/core/persqueue/user_info.cpp +++ b/ydb/core/persqueue/user_info.cpp @@ -152,6 +152,11 @@ TUserInfo& TUsersInfoStorage::GetOrCreate(const TString& user, const TActorConte return it->second; } +const TUserInfo* TUsersInfoStorage::GetIfExists(const TString& user) const { + auto it = UsersInfo.find(user); + return it != UsersInfo.end() ? &it->second : nullptr; +} + TUserInfo* TUsersInfoStorage::GetIfExists(const TString& user) { auto it = UsersInfo.find(user); return it != UsersInfo.end() ? &it->second : nullptr; diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 4d485f3c7e..3d6319accd 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -373,6 +373,7 @@ public: void Parse(const TString& key, const TString& data, const TActorContext& ctx); TUserInfo& GetOrCreate(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration = {}); + const TUserInfo* GetIfExists(const TString& user) const; TUserInfo* GetIfExists(const TString& user); void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) { diff --git a/ydb/core/persqueue/ut/metering_sink_ut.cpp b/ydb/core/persqueue/ut/metering_sink_ut.cpp index 82664aaff6..ee40fab7fe 100644 --- a/ydb/core/persqueue/ut/metering_sink_ut.cpp +++ b/ydb/core/persqueue/ut/metering_sink_ut.cpp @@ -167,6 +167,41 @@ Y_UNIT_TEST(FlushStorageV1) { UNIT_ASSERT_VALUES_EQUAL(fullMetering, referenceStorageJson); } +Y_UNIT_TEST(UsedStorageV1) { + TString fullMetering; + const ui64 creationTs = 1651752943168786; + const ui64 flushTs = 1651754943168786; + const ui32 partitions = 7; + const ui64 reservedSpace = 42_GB; + + TMeteringSink meteringSink; + meteringSink.Create(TInstant::FromValue(creationTs), { + .FlushInterval = TDuration::Seconds(10), + .TabletId = "tabletId", + .YcCloudId = "cloudId", + .YcFolderId = "folderId", + .YdbDatabaseId = "databaseId", + .StreamName = "streamName", + .ResourceId = "streamPath", + .PartitionsSize = partitions, + .ReservedSpace = reservedSpace, + }, {EMeteringJson::UsedStorageV1}, [&](TString json) { + fullMetering = TStringBuilder() << fullMetering << '\n' << json; + }); + + const ui32 quantity = 13; + + meteringSink.IncreaseQuantity(EMeteringJson::UsedStorageV1, quantity); + meteringSink.MayFlushForcibly(TInstant::FromValue(flushTs)); + + const TString referenceStorageJson = TStringBuilder() << + "\n{\"cloud_id\":\"cloudId\",\"folder_id\":\"folderId\",\"resource_id\":\"streamPath\"," + << "\"id\":\"used_storage-databaseId-tabletId-1651752943168-" << meteringSink.GetMeteringCounter() << "\"," + << "\"schema\":\"ydb.serverless.v1\",\"tags\":{\"ydb_size\":6815},\"usage\":{\"quantity\":2000,\"unit\":\"byte*second\",\"start\":1651752943,\"finish\":1651754943}," + << "\"labels\":{\"datastreams_stream_name\":\"streamName\",\"ydb_database\":\"databaseId\"},\"version\":\"1.0.0\",\"source_id\":\"tabletId\",\"source_wt\":1651754943}\n"; + UNIT_ASSERT_VALUES_EQUAL(fullMetering, referenceStorageJson); +} + } // Y_UNIT_TEST_SUITE(MeteringSink) } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/resources/counters_topics.html b/ydb/core/persqueue/ut/resources/counters_topics.html index 4e7686f4f2..2084d1e219 100644 --- a/ydb/core/persqueue/ut/resources/counters_topics.html +++ b/ydb/core/persqueue/ut/resources/counters_topics.html @@ -16,7 +16,7 @@ host=: name=topic.partition.read.inflight_throttled_microseconds_max: 0 name=topic.partition.read.speed_limit_bytes_per_second: 20000000000 name=topic.partition.read.throttled_microseconds_max: 0 - name=topic.partition.storage_bytes_max: 747 + name=topic.partition.storage_bytes_max: 0 name=topic.partition.total_count: 2 name=topic.partition.uptime_milliseconds_min: 30000 name=topic.partition.write.bytes_per_day_max: 540 @@ -29,7 +29,7 @@ host=: name=topic.producers_count: 3 name=topic.reserve.limit_bytes: 0 name=topic.reserve.used_bytes: 0 - name=topic.storage_bytes: 747 + name=topic.storage_bytes: 0 consumer=client: name=topic.partition.alive_count: 1 diff --git a/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp index b3444b45cf..12dce31e05 100644 --- a/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp +++ b/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp @@ -436,21 +436,23 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { PartitionPerTablet: 3 PQTabletConfig { PartitionConfig { - LifetimeSeconds: 11 + LifetimeSeconds: 2678400 WriteSpeedInBytesPerSecond : 17 } MeteringMode: METERING_MODE_RESERVED_CAPACITY } )"); env.TestWaitNotification(runtime, txId); - Assert(3 * 11 * 17, 0); // 561, 0 + Assert(3 * 2678400 * 17, 0); // 136598400, 0 + + auto msg = TString(24_MB, '_'); ui32 msgSeqNo = 100; - WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100"); + WriteToTopic(runtime, topicPath, msgSeqNo, msg); env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats - Assert(3 * 11 * 17, 69); // 69 - it is unstable value. it can change if internal message store change + Assert(3 * 2678400 * 17, 16975296); // 16975296 - it is unstable value. it can change if internal message store change } Y_UNIT_TEST(TopicPeriodicStatMeteringModeRequest) { @@ -504,15 +506,17 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { UNIT_ASSERT_EQUAL_C(0, stats->Record.GetDataSize(), "DataSize from ReadBalancer"); UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer"); + auto msg = TString(24_MB, '_'); + ui32 msgSeqNo = 100; - WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100"); + WriteToTopic(runtime, topicPath, ++msgSeqNo, msg); env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats - Assert(69, 0); // 69 - it is unstable value. it can change if internal message store change + Assert(16975296, 0); // 69 - it is unstable value. it can change if internal message store change stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); - UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer"); + UNIT_ASSERT_EQUAL_C(16975296, stats->Record.GetDataSize(), "DataSize from ReadBalancer"); UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer"); appData.PQConfig.SetBalancerWakeupIntervalSec(30); @@ -520,7 +524,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { GracefulRestartTablet(runtime, balancerId, sender); stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); - UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer after reload"); + UNIT_ASSERT_EQUAL_C(16975296, stats->Record.GetDataSize(), "DataSize from ReadBalancer after reload"); UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer after reload"); } diff --git a/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp b/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp index 2fd2050a03..af8d11da64 100644 --- a/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp +++ b/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp @@ -3178,8 +3178,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) { auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); UNIT_ASSERT_EQUAL_C(false, stats->Record.GetSubDomainOutOfSpace(), "SubDomainOutOfSpace from ReadBalancer"); + auto msg = TString(24_MB, '_'); + ui32 seqNo = 100; - WriteToTopic(runtime, "/MyRoot/USER_1/Topic1", ++seqNo, "Message 0"); + WriteToTopic(runtime, "/MyRoot/USER_1/Topic1", ++seqNo, msg); env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_1"), diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp index e3aaee33e4..6d262b6731 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp @@ -274,12 +274,17 @@ namespace NYdb::NTopic::NTests { DescribeConsumer(setup, client, true, false, false, false); DescribePartition(setup, client, true, false, false, false); + const size_t messagesCount = 1; + // Write a message { - auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID); + auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID).Codec(ECodec::RAW); auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings); - std::string message(10_KB, 'x'); - UNIT_ASSERT(writeSession->Write(message)); + std::string message(32_MB, 'x'); + + for(size_t i = 0; i < messagesCount; ++i) { + UNIT_ASSERT(writeSession->Write(message)); + } writeSession->Close(); } @@ -313,9 +318,10 @@ namespace NYdb::NTopic::NTests { TMaybe<TReadSessionEvent::TEvent> event = readSession->GetEvent(true); UNIT_ASSERT(event); auto commitOffsetAck = std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(event.Get()); + UNIT_ASSERT_C(commitOffsetAck, DebugString(*event)); - UNIT_ASSERT_VALUES_EQUAL(commitOffsetAck->GetCommittedOffset(), 1); + UNIT_ASSERT_VALUES_EQUAL(commitOffsetAck->GetCommittedOffset(), messagesCount); } } diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 1b710ff7f8..6e4e25d23e 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -269,7 +269,7 @@ namespace NKikimr::NGRpcProxy::V1 { if (hasPassword) { return TMsgPqCodes("incorrect client service type password", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } - if (AppData(ctx)->PQConfig.GetForceClientServiceTypePasswordCheck()) { // no password and check is required + if (pqConfig.GetForceClientServiceTypePasswordCheck()) { // no password and check is required return TMsgPqCodes("no client service type password provided", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } } @@ -292,7 +292,7 @@ namespace NKikimr::NGRpcProxy::V1 { } if (rr.important()) { - if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { + if (pqConfig.GetTopicsAreFirstClassCitizen() && !AppData(ctx)->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) { return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } config->MutablePartitionConfig()->AddImportantClientId(consumerName); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 816779ecba..534f58706d 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4805,7 +4805,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Cerr << response.DebugString() << "\n" << res.DebugString() << "\n"; UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(res.topic_stats().store_size_bytes(), 800); + UNIT_ASSERT_VALUES_EQUAL(res.topic_stats().store_size_bytes(), 0); UNIT_ASSERT_GE(res.partitions(0).partition_stats().partition_offsets().end(), 1); } |