aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-07-28 15:35:11 +0300
committerilnaz <ilnaz@ydb.tech>2022-07-28 15:35:11 +0300
commit7be068e753e63db22f92c306d08f0363da928908 (patch)
tree37e618d16f8d818c8a79a5729bf632ebe341804c
parent0836b1ef7920fc395f848c298b2ea601d83dbab2 (diff)
downloadydb-7be068e753e63db22f92c306d08f0363da928908.tar.gz
Metering modes
-rw-r--r--ydb/core/persqueue/pq_impl.cpp63
-rw-r--r--ydb/core/protos/pqconfig.proto6
-rw-r--r--ydb/core/testlib/basics/appdata.cpp7
-rw-r--r--ydb/core/testlib/basics/appdata.h1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp44
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.h1
8 files changed, 99 insertions, 25 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 506a18c3e46..75b325d5409 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -824,33 +824,46 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
const auto streamPath = Config.GetTopicPath();
auto& pqConfig = AppData(ctx)->PQConfig;
- if (pqConfig.HasBillingMeteringConfig() && pqConfig.GetBillingMeteringConfig().GetEnabled()) {
- TSet<EMeteringJson> whichToFlush{EMeteringJson::PutEventsV1, EMeteringJson::ResourcesReservedV1};
- ui64 storageLimitBytes{Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() *
- Config.GetPartitionConfig().GetLifetimeSeconds()};
-
- if (Config.GetPartitionConfig().HasStorageLimitBytes()) {
- storageLimitBytes = Config.GetPartitionConfig().GetStorageLimitBytes();
- whichToFlush = TSet<EMeteringJson>{EMeteringJson::PutEventsV1,
- EMeteringJson::ThroughputV1,
- EMeteringJson::StorageV1};
- }
+ if (!pqConfig.GetBillingMeteringConfig().GetEnabled()) {
+ LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " disable metering"
+ << ": reason# " << "billing is not enabled in BillingMeteringConfig");
+ return;
+ }
- MeteringSink.Create(ctx.Now(), {
- .FlushInterval = TDuration::Seconds(pqConfig.GetBillingMeteringConfig().GetFlushIntervalSec()),
- .TabletId = ToString(TabletID()),
- .YcCloudId = Config.GetYcCloudId(),
- .YcFolderId = Config.GetYcFolderId(),
- .YdbDatabaseId = Config.GetYdbDatabaseId(),
- .StreamName = streamName,
- .ResourceId = streamPath,
- .PartitionsSize = Config.PartitionsSize(),
- .WriteQuota = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(),
- .ReservedSpace = storageLimitBytes,
- .ConsumersThroughput = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() *
- Config.ReadRulesSize(),
- }, whichToFlush, std::bind(NMetering::SendMeteringJson, ctx, std::placeholders::_1));
+ switch (Config.GetMeteringMode()) {
+ case NKikimrPQ::TPQTabletConfig::METERING_MODE_SERVERLESS:
+ LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " disable metering"
+ << ": reason# " << "METERING_MODE_SERVERLESS");
+ return;
+ default:
+ break;
}
+
+ TSet<EMeteringJson> whichToFlush{EMeteringJson::PutEventsV1, EMeteringJson::ResourcesReservedV1};
+ ui64 storageLimitBytes{Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() *
+ Config.GetPartitionConfig().GetLifetimeSeconds()};
+
+ if (Config.GetPartitionConfig().HasStorageLimitBytes()) {
+ storageLimitBytes = Config.GetPartitionConfig().GetStorageLimitBytes();
+ whichToFlush = TSet<EMeteringJson>{EMeteringJson::PutEventsV1,
+ EMeteringJson::ThroughputV1,
+ EMeteringJson::StorageV1};
+ }
+
+ MeteringSink.Create(ctx.Now(), {
+ .FlushInterval = TDuration::Seconds(pqConfig.GetBillingMeteringConfig().GetFlushIntervalSec()),
+ .TabletId = ToString(TabletID()),
+ .YcCloudId = Config.GetYcCloudId(),
+ .YcFolderId = Config.GetYcFolderId(),
+ .YdbDatabaseId = Config.GetYdbDatabaseId(),
+ .StreamName = streamName,
+ .ResourceId = streamPath,
+ .PartitionsSize = Config.PartitionsSize(),
+ .WriteQuota = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(),
+ .ReservedSpace = storageLimitBytes,
+ .ConsumersThroughput = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() *
+ Config.ReadRulesSize(),
+ }, whichToFlush, std::bind(NMetering::SendMeteringJson, ctx, std::placeholders::_1));
}
void TPersQueue::ReturnTabletState(const TActorContext& ctx, const TChangeNotification& req, NKikimrProto::EReplyStatus status)
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index feb024e9a2b..9f10064bf9c 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -292,6 +292,12 @@ message TPQTabletConfig {
optional TPartitionKeyRange KeyRange = 2;
}
repeated TPartition Partitions = 31; // filled by schemeshard
+
+ enum EMeteringMode {
+ METERING_MODE_YDS = 0;
+ METERING_MODE_SERVERLESS = 1;
+ }
+ optional EMeteringMode MeteringMode = 34;
}
message TMessageGroupInfo {
diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp
index 03a68b73124..069ba8e7071 100644
--- a/ydb/core/testlib/basics/appdata.cpp
+++ b/ydb/core/testlib/basics/appdata.cpp
@@ -186,4 +186,11 @@ namespace NKikimr {
}
}
+ void TAppPrepare::SetEnablePqBilling(std::optional<bool> value)
+ {
+ if (value) {
+ PQConfig.MutableBillingMeteringConfig()->SetEnabled(*value);
+ }
+ }
+
}
diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h
index 93c7e6a9888..865cf3e8529 100644
--- a/ydb/core/testlib/basics/appdata.h
+++ b/ydb/core/testlib/basics/appdata.h
@@ -78,6 +78,7 @@ namespace NKikimr {
void SetHiveStoragePoolFreshPeriod(ui64 value);
void AddSystemBackupSID(const TString& sid);
void SetEnableProtoSourceIdInfo(std::optional<bool> value);
+ void SetEnablePqBilling(std::optional<bool> value);
TIntrusivePtr<TChannelProfiles> Channels;
NKikimrBlobStorage::TNodeWardenServiceSet BSConf;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 38c8862cd6b..de5dc705baf 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -655,6 +655,7 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx
auto& pqConfig = *desc.MutablePQTabletConfig();
pqConfig.SetTopicName(streamName);
pqConfig.SetTopicPath(streamPath.Child("streamImpl").PathString());
+ pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_SERVERLESS);
auto& partitionConfig = *pqConfig.MutablePartitionConfig();
partitionConfig.SetLifetimeSeconds(retentionPeriod.Seconds());
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 008fd2b7bb7..412b09c828d 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -1,3 +1,4 @@
+#include <ydb/core/metering/metering.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
@@ -625,4 +626,47 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
env.TestWaitNotification(runtime, txId);
}
+ Y_UNIT_TEST(Metering) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableProtoSourceIdInfo(true)
+ .EnablePqBilling(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ UniformPartitionsCount: 2
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_NOTICE);
+ TVector<TString> meteringRecords;
+ runtime.SetObserverFunc([&meteringRecords](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() != NMetering::TEvMetering::EvWriteMeteringJson) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+
+ meteringRecords.push_back(ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>()->MeteringJson);
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ for (int i = 0; i < 10; ++i) {
+ UNIT_ASSERT(meteringRecords.empty());
+ env.SimulateSleep(runtime, TDuration::Seconds(10));
+ }
+ }
+
} // TCdcStreamTests
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
index f4823fbd622..146bb9f10d5 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
@@ -503,6 +503,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnableNotNullColumns(opts.EnableNotNullColumns_);
app.SetEnableOlapSchemaOperations(opts.EnableOlapSchemaOperations_);
app.SetEnableProtoSourceIdInfo(opts.EnableProtoSourceIdInfo_);
+ app.SetEnablePqBilling(opts.EnablePqBilling_);
app.SetEnableBackgroundCompaction(opts.EnableBackgroundCompaction_);
app.FeatureFlags.SetEnablePublicApiExternalBlobs(true);
app.SetEnableMoveIndex(opts.EnableMoveIndex_);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
index bbc3ad8c6e8..ce65a9fef0e 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
@@ -40,6 +40,7 @@ namespace NSchemeShardUT_Private {
OPTION(std::optional<bool>, EnableNotNullColumns, std::nullopt);
OPTION(std::optional<bool>, EnableOlapSchemaOperations, true);
OPTION(std::optional<bool>, EnableProtoSourceIdInfo, std::nullopt);
+ OPTION(std::optional<bool>, EnablePqBilling, std::nullopt);
OPTION(std::optional<bool>, EnableBackgroundCompaction, std::nullopt);
OPTION(std::optional<bool>, DisableStatsBatching, std::nullopt);
OPTION(THashSet<TString>, SystemBackupSIDs, {});