diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-07-28 15:35:11 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-07-28 15:35:11 +0300 |
commit | 7be068e753e63db22f92c306d08f0363da928908 (patch) | |
tree | 37e618d16f8d818c8a79a5729bf632ebe341804c | |
parent | 0836b1ef7920fc395f848c298b2ea601d83dbab2 (diff) | |
download | ydb-7be068e753e63db22f92c306d08f0363da928908.tar.gz |
Metering modes
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 63 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 6 | ||||
-rw-r--r-- | ydb/core/testlib/basics/appdata.cpp | 7 | ||||
-rw-r--r-- | ydb/core/testlib/basics/appdata.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream.cpp | 44 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/test_env.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/test_env.h | 1 |
8 files changed, 99 insertions, 25 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 506a18c3e46..75b325d5409 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -824,33 +824,46 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) { const auto streamPath = Config.GetTopicPath(); auto& pqConfig = AppData(ctx)->PQConfig; - if (pqConfig.HasBillingMeteringConfig() && pqConfig.GetBillingMeteringConfig().GetEnabled()) { - TSet<EMeteringJson> whichToFlush{EMeteringJson::PutEventsV1, EMeteringJson::ResourcesReservedV1}; - ui64 storageLimitBytes{Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * - Config.GetPartitionConfig().GetLifetimeSeconds()}; - - if (Config.GetPartitionConfig().HasStorageLimitBytes()) { - storageLimitBytes = Config.GetPartitionConfig().GetStorageLimitBytes(); - whichToFlush = TSet<EMeteringJson>{EMeteringJson::PutEventsV1, - EMeteringJson::ThroughputV1, - EMeteringJson::StorageV1}; - } + if (!pqConfig.GetBillingMeteringConfig().GetEnabled()) { + LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " disable metering" + << ": reason# " << "billing is not enabled in BillingMeteringConfig"); + return; + } - MeteringSink.Create(ctx.Now(), { - .FlushInterval = TDuration::Seconds(pqConfig.GetBillingMeteringConfig().GetFlushIntervalSec()), - .TabletId = ToString(TabletID()), - .YcCloudId = Config.GetYcCloudId(), - .YcFolderId = Config.GetYcFolderId(), - .YdbDatabaseId = Config.GetYdbDatabaseId(), - .StreamName = streamName, - .ResourceId = streamPath, - .PartitionsSize = Config.PartitionsSize(), - .WriteQuota = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), - .ReservedSpace = storageLimitBytes, - .ConsumersThroughput = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * - Config.ReadRulesSize(), - }, whichToFlush, std::bind(NMetering::SendMeteringJson, ctx, std::placeholders::_1)); + switch (Config.GetMeteringMode()) { + case NKikimrPQ::TPQTabletConfig::METERING_MODE_SERVERLESS: + LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " disable metering" + << ": reason# " << "METERING_MODE_SERVERLESS"); + return; + default: + break; } + + TSet<EMeteringJson> whichToFlush{EMeteringJson::PutEventsV1, EMeteringJson::ResourcesReservedV1}; + ui64 storageLimitBytes{Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * + Config.GetPartitionConfig().GetLifetimeSeconds()}; + + if (Config.GetPartitionConfig().HasStorageLimitBytes()) { + storageLimitBytes = Config.GetPartitionConfig().GetStorageLimitBytes(); + whichToFlush = TSet<EMeteringJson>{EMeteringJson::PutEventsV1, + EMeteringJson::ThroughputV1, + EMeteringJson::StorageV1}; + } + + MeteringSink.Create(ctx.Now(), { + .FlushInterval = TDuration::Seconds(pqConfig.GetBillingMeteringConfig().GetFlushIntervalSec()), + .TabletId = ToString(TabletID()), + .YcCloudId = Config.GetYcCloudId(), + .YcFolderId = Config.GetYcFolderId(), + .YdbDatabaseId = Config.GetYdbDatabaseId(), + .StreamName = streamName, + .ResourceId = streamPath, + .PartitionsSize = Config.PartitionsSize(), + .WriteQuota = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), + .ReservedSpace = storageLimitBytes, + .ConsumersThroughput = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * + Config.ReadRulesSize(), + }, whichToFlush, std::bind(NMetering::SendMeteringJson, ctx, std::placeholders::_1)); } void TPersQueue::ReturnTabletState(const TActorContext& ctx, const TChangeNotification& req, NKikimrProto::EReplyStatus status) diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index feb024e9a2b..9f10064bf9c 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -292,6 +292,12 @@ message TPQTabletConfig { optional TPartitionKeyRange KeyRange = 2; } repeated TPartition Partitions = 31; // filled by schemeshard + + enum EMeteringMode { + METERING_MODE_YDS = 0; + METERING_MODE_SERVERLESS = 1; + } + optional EMeteringMode MeteringMode = 34; } message TMessageGroupInfo { diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp index 03a68b73124..069ba8e7071 100644 --- a/ydb/core/testlib/basics/appdata.cpp +++ b/ydb/core/testlib/basics/appdata.cpp @@ -186,4 +186,11 @@ namespace NKikimr { } } + void TAppPrepare::SetEnablePqBilling(std::optional<bool> value) + { + if (value) { + PQConfig.MutableBillingMeteringConfig()->SetEnabled(*value); + } + } + } diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h index 93c7e6a9888..865cf3e8529 100644 --- a/ydb/core/testlib/basics/appdata.h +++ b/ydb/core/testlib/basics/appdata.h @@ -78,6 +78,7 @@ namespace NKikimr { void SetHiveStoragePoolFreshPeriod(ui64 value); void AddSystemBackupSID(const TString& sid); void SetEnableProtoSourceIdInfo(std::optional<bool> value); + void SetEnablePqBilling(std::optional<bool> value); TIntrusivePtr<TChannelProfiles> Channels; NKikimrBlobStorage::TNodeWardenServiceSet BSConf; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 38c8862cd6b..de5dc705baf 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -655,6 +655,7 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx auto& pqConfig = *desc.MutablePQTabletConfig(); pqConfig.SetTopicName(streamName); pqConfig.SetTopicPath(streamPath.Child("streamImpl").PathString()); + pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_SERVERLESS); auto& partitionConfig = *pqConfig.MutablePartitionConfig(); partitionConfig.SetLifetimeSeconds(retentionPeriod.Seconds()); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 008fd2b7bb7..412b09c828d 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -1,3 +1,4 @@ +#include <ydb/core/metering/metering.h> #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> #include <ydb/core/tx/schemeshard/schemeshard_impl.h> @@ -625,4 +626,47 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { env.TestWaitNotification(runtime, txId); } + Y_UNIT_TEST(Metering) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnablePqBilling(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + UniformPartitionsCount: 2 + )"); + env.TestWaitNotification(runtime, txId); + + runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_NOTICE); + TVector<TString> meteringRecords; + runtime.SetObserverFunc([&meteringRecords](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() != NMetering::TEvMetering::EvWriteMeteringJson) { + return TTestActorRuntime::EEventAction::PROCESS; + } + + meteringRecords.push_back(ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>()->MeteringJson); + return TTestActorRuntime::EEventAction::PROCESS; + }); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + for (int i = 0; i < 10; ++i) { + UNIT_ASSERT(meteringRecords.empty()); + env.SimulateSleep(runtime, TDuration::Seconds(10)); + } + } + } // TCdcStreamTests diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index f4823fbd622..146bb9f10d5 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -503,6 +503,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableNotNullColumns(opts.EnableNotNullColumns_); app.SetEnableOlapSchemaOperations(opts.EnableOlapSchemaOperations_); app.SetEnableProtoSourceIdInfo(opts.EnableProtoSourceIdInfo_); + app.SetEnablePqBilling(opts.EnablePqBilling_); app.SetEnableBackgroundCompaction(opts.EnableBackgroundCompaction_); app.FeatureFlags.SetEnablePublicApiExternalBlobs(true); app.SetEnableMoveIndex(opts.EnableMoveIndex_); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index bbc3ad8c6e8..ce65a9fef0e 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -40,6 +40,7 @@ namespace NSchemeShardUT_Private { OPTION(std::optional<bool>, EnableNotNullColumns, std::nullopt); OPTION(std::optional<bool>, EnableOlapSchemaOperations, true); OPTION(std::optional<bool>, EnableProtoSourceIdInfo, std::nullopt); + OPTION(std::optional<bool>, EnablePqBilling, std::nullopt); OPTION(std::optional<bool>, EnableBackgroundCompaction, std::nullopt); OPTION(std::optional<bool>, DisableStatsBatching, std::nullopt); OPTION(THashSet<TString>, SystemBackupSIDs, {}); |