diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-09-02 15:23:44 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-09-02 15:23:44 +0300 |
commit | 3aee423fc13b315cbe5df6913a0fa382deab955c (patch) | |
tree | 242b370f2f76f0a2e6585bab38b21b963483aa08 | |
parent | 2528da896c1e112c1752e53e6a77d56cb3918b28 (diff) | |
download | ydb-3aee423fc13b315cbe5df6913a0fa382deab955c.tar.gz |
Check the metering status before applying the metering mode
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_base.cpp | 134 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h | 4 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 4 | ||||
-rw-r--r-- | ydb/services/lib/actors/pq_schema_actor.cpp | 70 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/schema_actors.cpp | 7 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 93 |
9 files changed, 163 insertions, 191 deletions
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 556cf19a72b..466b72ef1b9 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -126,6 +126,7 @@ message TPQConfig { } message TBillingMeteringConfig { + // Enables billing & allows to use MeteringMode (see below) optional bool Enabled = 1 [default = false]; optional uint64 FlushIntervalSec = 2 [default = 30]; optional TReadMeteringConfig Read = 3; @@ -309,10 +310,7 @@ message TPQTabletConfig { METERING_MODE_RESERVED_CAPACITY = 0; METERING_MODE_REQUEST_UNITS = 1; } - // Metering mode used in config optional EMeteringMode MeteringMode = 34; - // Metering mode passed in the request - optional EMeteringMode RequestMeteringMode = 35; } message TMessageGroupInfo { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index 1eaa8cb3f63..1a192f1c620 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -75,10 +75,10 @@ public: } TPersQueueGroupInfo::TPtr ParseParams( - TOperationContext& context, bool isServerlessDomain, + TOperationContext& context, NKikimrPQ::TPQTabletConfig* tabletConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& alter, - TEvSchemeShard::EStatus& status, TString& errStr) + TString& errStr) { TPersQueueGroupInfo::TPtr params = new TPersQueueGroupInfo(); const bool hasKeySchema = tabletConfig->PartitionKeySchemaSize(); @@ -113,17 +113,6 @@ public: return nullptr; } - if (alterConfig.HasRequestMeteringMode()) { - if (!isServerlessDomain) { - status = NKikimrScheme::StatusPreconditionFailed; - errStr = "Metering mode can only be specified in a serverless domain"; - return nullptr; - } else { - alterConfig.SetMeteringMode(alterConfig.GetRequestMeteringMode()); - alterConfig.ClearRequestMeteringMode(); - } - } - if (alterConfig.GetPartitionConfig().ExplicitChannelProfilesSize() > 0) { // Validate explicit channel profiles alter attempt const auto& ecps = alterConfig.GetPartitionConfig().GetExplicitChannelProfiles(); @@ -466,13 +455,10 @@ public: } newTabletConfig = tabletConfig; - const auto domainPath = TPath::Init(path.GetPathIdForDomain(), context.SS); - TEvSchemeShard::EStatus parseStatus = NKikimrScheme::StatusInvalidParameter; - TPersQueueGroupInfo::TPtr alterData = ParseParams( - context, context.SS->IsServerlessDomain(domainPath), &newTabletConfig, alter, parseStatus, errStr); + TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr); if (!alterData) { - result->SetError(parseStatus, errStr); + result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index 613b6da6e45..4aca35cf9a1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -12,7 +12,7 @@ namespace { using namespace NKikimr; using namespace NSchemeShard; -TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, bool isServerlessDomain, +TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, const NKikimrSchemeOp::TPersQueueGroupDescription& op, TEvSchemeShard::EStatus& status, TString& errStr) { @@ -147,17 +147,6 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, bool return nullptr; } - if (tabletConfig.HasRequestMeteringMode()) { - if (!isServerlessDomain) { - status = NKikimrScheme::StatusPreconditionFailed; - errStr = "Metering mode can only be specified in a serverless domain"; - return nullptr; - } else { - tabletConfig.SetMeteringMode(tabletConfig.GetRequestMeteringMode()); - tabletConfig.ClearRequestMeteringMode(); - } - } - const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId()); if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) { auto cloudId = dbRootEl->UserAttrs->Attrs.at("cloud_id"); @@ -387,9 +376,8 @@ public: return result; } - const auto domainPath = TPath::Init(dstPath.GetPathIdForDomain(), context.SS); TPersQueueGroupInfo::TPtr pqGroup = CreatePersQueueGroup( - context, context.SS->IsServerlessDomain(domainPath), createDEscription, status, errStr); + context, createDEscription, status, errStr); if (!pqGroup.Get()) { result->SetError(status, errStr); diff --git a/ydb/core/tx/schemeshard/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base.cpp index f298bd1c7b8..dc4e61efc72 100644 --- a/ydb/core/tx/schemeshard/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base.cpp @@ -6379,110 +6379,19 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { TTestEnv env(runtime); ui64 txId = 100; - // create shared db - TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", R"( - Name: "Shared" - )"); - env.TestWaitNotification(runtime, txId); - - TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"( - Name: "Shared" - StoragePools { - Name: "pool-1" - Kind: "pool-kind-1" - } - StoragePools { - Name: "pool-2" - Kind: "pool-kind-2" - } - PlanResolution: 50 - Coordinators: 1 - Mediators: 1 - TimeCastBucketsPerMediator: 2 - ExternalSchemeShard: true - ExternalHive: false - )"); - env.TestWaitNotification(runtime, txId); - - ui64 sharedSchemeShard = 0; - TestDescribeResult(DescribePath(runtime, "/MyRoot/Shared"), { - NLs::PathExist, - NLs::IsExternalSubDomain("Shared"), - NLs::ExtractTenantSchemeshard(&sharedSchemeShard) - }); - - UNIT_ASSERT(sharedSchemeShard != 0 && sharedSchemeShard != TTestTxConfig::SchemeShard); - - // create serverless db - TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"( - Name: "Serverless" - ResourcesDomainKey { - SchemeShard: %lu - PathId: 2 - } - )", TTestTxConfig::SchemeShard)); - env.TestWaitNotification(runtime, txId); - - TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"( - Name: "Serverless" - StoragePools { - Name: "pool-1" - Kind: "pool-kind-1" - } - PlanResolution: 50 - Coordinators: 1 - Mediators: 1 - TimeCastBucketsPerMediator: 2 - ExternalSchemeShard: true - ExternalHive: false - )"); - env.TestWaitNotification(runtime, txId); - - ui64 serverlessSchemeShard = 0; - TestDescribeResult(DescribePath(runtime, "/MyRoot/Serverless"), { - NLs::PathExist, - NLs::IsExternalSubDomain("Serverless"), - NLs::ExtractTenantSchemeshard(&serverlessSchemeShard) - }); - - UNIT_ASSERT(serverlessSchemeShard != 0 && serverlessSchemeShard != TTestTxConfig::SchemeShard); - - // create topic in a serverless db - TestCreatePQGroup(runtime, serverlessSchemeShard, ++txId, "/MyRoot/Serverless", R"( + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( Name: "Topic1" TotalGroupCount: 1 PartitionPerTablet: 1 PQTabletConfig { PartitionConfig { LifetimeSeconds: 10 } - RequestMeteringMode: METERING_MODE_REQUEST_UNITS - } - )"); - env.TestWaitNotification(runtime, txId, serverlessSchemeShard); - - TestDescribeResult( - DescribePath(runtime, serverlessSchemeShard, "/MyRoot/Serverless/Topic1"), { - NLs::PathExist, - NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { - const auto& config = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig(); - UNIT_ASSERT(config.HasMeteringMode()); - UNIT_ASSERT(config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); - } - } - ); - - TestCreatePQGroup(runtime, serverlessSchemeShard, ++txId, "/MyRoot/Serverless", R"( - Name: "Topic2" - TotalGroupCount: 1 - PartitionPerTablet: 1 - PQTabletConfig { - PartitionConfig { LifetimeSeconds: 10 } MeteringMode: METERING_MODE_REQUEST_UNITS } )"); - env.TestWaitNotification(runtime, txId, serverlessSchemeShard); + env.TestWaitNotification(runtime, txId); TestDescribeResult( - DescribePath(runtime, serverlessSchemeShard, "/MyRoot/Serverless/Topic2"), { + DescribePath(runtime, "/MyRoot/Topic1"), { NLs::PathExist, NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { const auto& config = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig(); @@ -6492,18 +6401,17 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { } ); - // alter topic in a serverless db - TestAlterPQGroup(runtime, serverlessSchemeShard, ++txId, "/MyRoot/Serverless", R"( - Name: "Topic2" + TestAlterPQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" PQTabletConfig { PartitionConfig { LifetimeSeconds: 10 } - RequestMeteringMode: METERING_MODE_RESERVED_CAPACITY + MeteringMode: METERING_MODE_RESERVED_CAPACITY } )"); - env.TestWaitNotification(runtime, txId, serverlessSchemeShard); + env.TestWaitNotification(runtime, txId); TestDescribeResult( - DescribePath(runtime, serverlessSchemeShard, "/MyRoot/Serverless/Topic2"), { + DescribePath(runtime, "/MyRoot/Topic1"), { NLs::PathExist, NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { const auto& config = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig(); @@ -6513,18 +6421,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { } ); - // try to create topic in a non-serverless db - TestCreatePQGroup(runtime, sharedSchemeShard, ++txId, "/MyRoot/Shared", R"( - Name: "Topic1" - TotalGroupCount: 1 - PartitionPerTablet: 1 - PQTabletConfig { - PartitionConfig { LifetimeSeconds: 10 } - RequestMeteringMode: METERING_MODE_REQUEST_UNITS - } - )", {NKikimrScheme::StatusPreconditionFailed}); - - TestCreatePQGroup(runtime, sharedSchemeShard, ++txId, "/MyRoot/Shared", R"( + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( Name: "Topic2" TotalGroupCount: 1 PartitionPerTablet: 1 @@ -6532,10 +6429,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { PartitionConfig { LifetimeSeconds: 10 } } )"); - env.TestWaitNotification(runtime, txId, sharedSchemeShard); + env.TestWaitNotification(runtime, txId); TestDescribeResult( - DescribePath(runtime, sharedSchemeShard, "/MyRoot/Shared/Topic2"), { + DescribePath(runtime, "/MyRoot/Topic2"), { NLs::PathExist, NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { const auto& config = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig(); @@ -6543,15 +6440,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { } } ); - - // alter topic in a non-serverless db - TestAlterPQGroup(runtime, sharedSchemeShard, ++txId, "/MyRoot/Shared", R"( - Name: "Topic2" - PQTabletConfig { - PartitionConfig { LifetimeSeconds: 10 } - RequestMeteringMode: METERING_MODE_RESERVED_CAPACITY - } - )", {NKikimrScheme::StatusPreconditionFailed}); } Y_UNIT_TEST(DropTable) { //+ diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h index 40559602f6f..e8f9910670c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h @@ -93,6 +93,10 @@ public: AnnoyingClient->UpdateDC(name, local, enabled); } + const NYdb::TDriver& GetDriver() const { + return CleverServer->GetDriver(); + } + public: TSimpleSharedPtr<TPortManager> PortManager; ui16 Port; diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 9ed0870c1a7..1e7be043d8a 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -139,7 +139,9 @@ namespace NKikimr::NDataStreams::V1 { pqDescr->SetPartitionPerTablet(1); // TODO: support StreamMode - pqDescr->MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); + if (AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { + pqDescr->MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); + } TString error; auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicSettings, modifyScheme, ctx, false, error, diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 138dced9e24..4bf336d16a9 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -849,7 +849,40 @@ namespace NKikimr::NGRpcProxy::V1 { return CheckConfig(*config, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST); } + static bool FillMeteringMode(Ydb::Topic::MeteringMode mode, NKikimrPQ::TPQTabletConfig& config, + bool meteringEnabled, bool isAlter, Ydb::StatusIds::StatusCode& code, TString& error) + { + if (meteringEnabled) { + switch (mode) { + case Ydb::Topic::METERING_MODE_UNSPECIFIED: + if (!isAlter) { + config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + } + break; + case Ydb::Topic::METERING_MODE_REQUEST_UNITS: + config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + break; + case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY: + config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); + break; + default: + code = Ydb::StatusIds::BAD_REQUEST; + error = "Unknown metering mode"; + return false; + } + } else { + switch (mode) { + case Ydb::Topic::METERING_MODE_UNSPECIFIED: + break; + default: + code = Ydb::StatusIds::PRECONDITION_FAILED; + error = "Metering mode can only be specified in a serverless database"; + return false; + } + } + return true; + } Ydb::StatusIds::StatusCode FillProposeRequestImpl( const TString& name, const Ydb::Topic::CreateTopicRequest& request, @@ -975,19 +1008,9 @@ namespace NKikimr::NGRpcProxy::V1 { } } - switch (request.metering_mode()) { - case Ydb::Topic::METERING_MODE_UNSPECIFIED: - config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); - break; - case Ydb::Topic::METERING_MODE_REQUEST_UNITS: - config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); - break; - case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY: - config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); - break; - default: - error = "Unknown metering mode"; - return Ydb::StatusIds::BAD_REQUEST; + Ydb::StatusIds::StatusCode code; + if (!FillMeteringMode(request.metering_mode(), *config, pqConfig.GetBillingMeteringConfig().GetEnabled(), false, code, error)) { + return code; } const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx); @@ -1003,8 +1026,6 @@ namespace NKikimr::NGRpcProxy::V1 { return CheckConfig(*config, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST); } - - Ydb::StatusIds::StatusCode FillProposeRequestImpl( const Ydb::Topic::AlterTopicRequest& request, NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx, @@ -1015,6 +1036,8 @@ namespace NKikimr::NGRpcProxy::V1 { return Ydb::StatusIds::BAD_REQUEST;\ } + const auto& pqConfig = AppData(ctx)->PQConfig; + if (request.has_alter_partitioning_settings() && request.alter_partitioning_settings().has_set_min_active_partitions()) { CHECK_CDC; auto parts = request.alter_partitioning_settings().set_min_active_partitions(); @@ -1058,7 +1081,7 @@ namespace NKikimr::NGRpcProxy::V1 { } bool local = true; //todo: check locality - if (local || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { + if (local || pqConfig.GetTopicsAreFirstClassCitizen()) { if (request.has_set_partition_write_speed_bytes_per_second()) { CHECK_CDC; auto partSpeed = request.set_partition_write_speed_bytes_per_second(); @@ -1099,18 +1122,9 @@ namespace NKikimr::NGRpcProxy::V1 { } } - switch (request.set_metering_mode()) { - case Ydb::Topic::METERING_MODE_UNSPECIFIED: - break; // do not change - case Ydb::Topic::METERING_MODE_REQUEST_UNITS: - config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); - break; - case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY: - config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); - break; - default: - error = "Unknown metering mode"; - return Ydb::StatusIds::BAD_REQUEST; + Ydb::StatusIds::StatusCode code; + if (!FillMeteringMode(request.set_metering_mode(), *config, pqConfig.GetBillingMeteringConfig().GetEnabled(), true, code, error)) { + return code; } const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 3bee991d2eb..fe23b799d07 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -508,7 +508,9 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv (*result.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000); (*result.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts(); - if (local || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { + const auto& pqConfig = AppData(ctx)->PQConfig; + + if (local || pqConfig.GetTopicsAreFirstClassCitizen()) { result.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond()); result.set_partition_write_burst_bytes(partConfig.GetBurstSize()); } @@ -517,7 +519,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv result.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1)); } - if (response.DomainInfo && response.DomainInfo->IsServerless()) { + if (pqConfig.GetBillingMeteringConfig().GetEnabled()) { switch (config.GetMeteringMode()) { case NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY: result.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY); @@ -530,7 +532,6 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } } - const auto& pqConfig = AppData(ctx)->PQConfig; for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { auto rr = result.add_consumers(); auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 8c6b26b34b8..2e8dd7931f0 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -3615,7 +3615,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { ReadRuleVersions: 567 TopicPath: "/Root/PQ/rt3.dc1--acc--topic3" YdbDatabasePath: "/Root" - MeteringMode: METERING_MODE_REQUEST_UNITS } ErrorCode: OK } @@ -4737,6 +4736,98 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { TestReadRuleServiceTypePasswordImpl(true); } + void CreateTopicWithMeteringMode(bool meteringEnabled) { + TServerSettings serverSettings = PQSettings(0); + serverSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); + serverSettings.PQConfig.MutableBillingMeteringConfig()->SetEnabled(meteringEnabled); + NPersQueue::TTestServer server(serverSettings); + + using namespace NYdb::NTopic; + auto client = TTopicClient(server.GetDriver()); + + for (const auto mode : {EMeteringMode::RequestUnits, EMeteringMode::ReservedCapacity}) { + const TString path = TStringBuilder() << "/Root/PQ/Topic" << mode; + + auto res = client.CreateTopic(path, TCreateTopicSettings() + .MeteringMode(mode) + ).ExtractValueSync(); + + if (!meteringEnabled) { + UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED); + continue; + } + + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + auto desc = client.DescribeTopic(path).ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetMeteringMode(), mode); + } + } + + Y_UNIT_TEST(CreateTopicWithMeteringMode) { + CreateTopicWithMeteringMode(false); + CreateTopicWithMeteringMode(true); + } + + void SetMeteringMode(bool meteringEnabled) { + TServerSettings serverSettings = PQSettings(0); + serverSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); + serverSettings.PQConfig.MutableBillingMeteringConfig()->SetEnabled(meteringEnabled); + NPersQueue::TTestServer server(serverSettings); + + using namespace NYdb::NTopic; + auto client = TTopicClient(server.GetDriver()); + + { + auto res = client.CreateTopic("/Root/PQ/ttt").ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + + for (const auto mode : {EMeteringMode::RequestUnits, EMeteringMode::ReservedCapacity}) { + auto res = client.AlterTopic("/Root/PQ/ttt", TAlterTopicSettings() + .SetMeteringMode(mode) + ).ExtractValueSync(); + + if (!meteringEnabled) { + UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED); + continue; + } + + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + auto desc = client.DescribeTopic("/Root/PQ/ttt").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetMeteringMode(), mode); + } + } + + Y_UNIT_TEST(SetMeteringMode) { + SetMeteringMode(false); + SetMeteringMode(true); + } + + void DefaultMeteringMode(bool meteringEnabled) { + TServerSettings serverSettings = PQSettings(0); + serverSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); + serverSettings.PQConfig.MutableBillingMeteringConfig()->SetEnabled(meteringEnabled); + NPersQueue::TTestServer server(serverSettings); + + using namespace NYdb::NTopic; + auto client = TTopicClient(server.GetDriver()); + + auto res = client.CreateTopic("/Root/PQ/ttt").ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + + auto desc = client.DescribeTopic("/Root/PQ/ttt").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetMeteringMode(), (meteringEnabled + ? EMeteringMode::RequestUnits + : EMeteringMode::Unspecified)); + } + + Y_UNIT_TEST(DefaultMeteringMode) { + DefaultMeteringMode(false); + DefaultMeteringMode(true); + } Y_UNIT_TEST(TClusterTrackerTest) { APITestSetup setup{TEST_CASE_NAME}; |