aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-09-02 15:23:44 +0300
committerilnaz <ilnaz@ydb.tech>2022-09-02 15:23:44 +0300
commit3aee423fc13b315cbe5df6913a0fa382deab955c (patch)
tree242b370f2f76f0a2e6585bab38b21b963483aa08
parent2528da896c1e112c1752e53e6a77d56cb3918b28 (diff)
downloadydb-3aee423fc13b315cbe5df6913a0fa382deab955c.tar.gz
Check the metering status before applying the metering mode
-rw-r--r--ydb/core/protos/pqconfig.proto4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp22
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp16
-rw-r--r--ydb/core/tx/schemeshard/ut_base.cpp134
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h4
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp4
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp70
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp7
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp93
9 files changed, 163 insertions, 191 deletions
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 556cf19a72b..466b72ef1b9 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -126,6 +126,7 @@ message TPQConfig {
}
message TBillingMeteringConfig {
+ // Enables billing & allows to use MeteringMode (see below)
optional bool Enabled = 1 [default = false];
optional uint64 FlushIntervalSec = 2 [default = 30];
optional TReadMeteringConfig Read = 3;
@@ -309,10 +310,7 @@ message TPQTabletConfig {
METERING_MODE_RESERVED_CAPACITY = 0;
METERING_MODE_REQUEST_UNITS = 1;
}
- // Metering mode used in config
optional EMeteringMode MeteringMode = 34;
- // Metering mode passed in the request
- optional EMeteringMode RequestMeteringMode = 35;
}
message TMessageGroupInfo {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
index 1eaa8cb3f63..1a192f1c620 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
@@ -75,10 +75,10 @@ public:
}
TPersQueueGroupInfo::TPtr ParseParams(
- TOperationContext& context, bool isServerlessDomain,
+ TOperationContext& context,
NKikimrPQ::TPQTabletConfig* tabletConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& alter,
- TEvSchemeShard::EStatus& status, TString& errStr)
+ TString& errStr)
{
TPersQueueGroupInfo::TPtr params = new TPersQueueGroupInfo();
const bool hasKeySchema = tabletConfig->PartitionKeySchemaSize();
@@ -113,17 +113,6 @@ public:
return nullptr;
}
- if (alterConfig.HasRequestMeteringMode()) {
- if (!isServerlessDomain) {
- status = NKikimrScheme::StatusPreconditionFailed;
- errStr = "Metering mode can only be specified in a serverless domain";
- return nullptr;
- } else {
- alterConfig.SetMeteringMode(alterConfig.GetRequestMeteringMode());
- alterConfig.ClearRequestMeteringMode();
- }
- }
-
if (alterConfig.GetPartitionConfig().ExplicitChannelProfilesSize() > 0) {
// Validate explicit channel profiles alter attempt
const auto& ecps = alterConfig.GetPartitionConfig().GetExplicitChannelProfiles();
@@ -466,13 +455,10 @@ public:
}
newTabletConfig = tabletConfig;
- const auto domainPath = TPath::Init(path.GetPathIdForDomain(), context.SS);
- TEvSchemeShard::EStatus parseStatus = NKikimrScheme::StatusInvalidParameter;
- TPersQueueGroupInfo::TPtr alterData = ParseParams(
- context, context.SS->IsServerlessDomain(domainPath), &newTabletConfig, alter, parseStatus, errStr);
+ TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);
if (!alterData) {
- result->SetError(parseStatus, errStr);
+ result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
index 613b6da6e45..4aca35cf9a1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
@@ -12,7 +12,7 @@ namespace {
using namespace NKikimr;
using namespace NSchemeShard;
-TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, bool isServerlessDomain,
+TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
const NKikimrSchemeOp::TPersQueueGroupDescription& op,
TEvSchemeShard::EStatus& status, TString& errStr)
{
@@ -147,17 +147,6 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, bool
return nullptr;
}
- if (tabletConfig.HasRequestMeteringMode()) {
- if (!isServerlessDomain) {
- status = NKikimrScheme::StatusPreconditionFailed;
- errStr = "Metering mode can only be specified in a serverless domain";
- return nullptr;
- } else {
- tabletConfig.SetMeteringMode(tabletConfig.GetRequestMeteringMode());
- tabletConfig.ClearRequestMeteringMode();
- }
- }
-
const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId());
if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) {
auto cloudId = dbRootEl->UserAttrs->Attrs.at("cloud_id");
@@ -387,9 +376,8 @@ public:
return result;
}
- const auto domainPath = TPath::Init(dstPath.GetPathIdForDomain(), context.SS);
TPersQueueGroupInfo::TPtr pqGroup = CreatePersQueueGroup(
- context, context.SS->IsServerlessDomain(domainPath), createDEscription, status, errStr);
+ context, createDEscription, status, errStr);
if (!pqGroup.Get()) {
result->SetError(status, errStr);
diff --git a/ydb/core/tx/schemeshard/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base.cpp
index f298bd1c7b8..dc4e61efc72 100644
--- a/ydb/core/tx/schemeshard/ut_base.cpp
+++ b/ydb/core/tx/schemeshard/ut_base.cpp
@@ -6379,110 +6379,19 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
TTestEnv env(runtime);
ui64 txId = 100;
- // create shared db
- TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", R"(
- Name: "Shared"
- )");
- env.TestWaitNotification(runtime, txId);
-
- TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"(
- Name: "Shared"
- StoragePools {
- Name: "pool-1"
- Kind: "pool-kind-1"
- }
- StoragePools {
- Name: "pool-2"
- Kind: "pool-kind-2"
- }
- PlanResolution: 50
- Coordinators: 1
- Mediators: 1
- TimeCastBucketsPerMediator: 2
- ExternalSchemeShard: true
- ExternalHive: false
- )");
- env.TestWaitNotification(runtime, txId);
-
- ui64 sharedSchemeShard = 0;
- TestDescribeResult(DescribePath(runtime, "/MyRoot/Shared"), {
- NLs::PathExist,
- NLs::IsExternalSubDomain("Shared"),
- NLs::ExtractTenantSchemeshard(&sharedSchemeShard)
- });
-
- UNIT_ASSERT(sharedSchemeShard != 0 && sharedSchemeShard != TTestTxConfig::SchemeShard);
-
- // create serverless db
- TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"(
- Name: "Serverless"
- ResourcesDomainKey {
- SchemeShard: %lu
- PathId: 2
- }
- )", TTestTxConfig::SchemeShard));
- env.TestWaitNotification(runtime, txId);
-
- TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"(
- Name: "Serverless"
- StoragePools {
- Name: "pool-1"
- Kind: "pool-kind-1"
- }
- PlanResolution: 50
- Coordinators: 1
- Mediators: 1
- TimeCastBucketsPerMediator: 2
- ExternalSchemeShard: true
- ExternalHive: false
- )");
- env.TestWaitNotification(runtime, txId);
-
- ui64 serverlessSchemeShard = 0;
- TestDescribeResult(DescribePath(runtime, "/MyRoot/Serverless"), {
- NLs::PathExist,
- NLs::IsExternalSubDomain("Serverless"),
- NLs::ExtractTenantSchemeshard(&serverlessSchemeShard)
- });
-
- UNIT_ASSERT(serverlessSchemeShard != 0 && serverlessSchemeShard != TTestTxConfig::SchemeShard);
-
- // create topic in a serverless db
- TestCreatePQGroup(runtime, serverlessSchemeShard, ++txId, "/MyRoot/Serverless", R"(
+ TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
Name: "Topic1"
TotalGroupCount: 1
PartitionPerTablet: 1
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
- RequestMeteringMode: METERING_MODE_REQUEST_UNITS
- }
- )");
- env.TestWaitNotification(runtime, txId, serverlessSchemeShard);
-
- TestDescribeResult(
- DescribePath(runtime, serverlessSchemeShard, "/MyRoot/Serverless/Topic1"), {
- NLs::PathExist,
- NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
- const auto& config = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig();
- UNIT_ASSERT(config.HasMeteringMode());
- UNIT_ASSERT(config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
- }
- }
- );
-
- TestCreatePQGroup(runtime, serverlessSchemeShard, ++txId, "/MyRoot/Serverless", R"(
- Name: "Topic2"
- TotalGroupCount: 1
- PartitionPerTablet: 1
- PQTabletConfig {
- PartitionConfig { LifetimeSeconds: 10 }
MeteringMode: METERING_MODE_REQUEST_UNITS
}
)");
- env.TestWaitNotification(runtime, txId, serverlessSchemeShard);
+ env.TestWaitNotification(runtime, txId);
TestDescribeResult(
- DescribePath(runtime, serverlessSchemeShard, "/MyRoot/Serverless/Topic2"), {
+ DescribePath(runtime, "/MyRoot/Topic1"), {
NLs::PathExist,
NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
const auto& config = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig();
@@ -6492,18 +6401,17 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
}
);
- // alter topic in a serverless db
- TestAlterPQGroup(runtime, serverlessSchemeShard, ++txId, "/MyRoot/Serverless", R"(
- Name: "Topic2"
+ TestAlterPQGroup(runtime, ++txId, "/MyRoot", R"(
+ Name: "Topic1"
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
- RequestMeteringMode: METERING_MODE_RESERVED_CAPACITY
+ MeteringMode: METERING_MODE_RESERVED_CAPACITY
}
)");
- env.TestWaitNotification(runtime, txId, serverlessSchemeShard);
+ env.TestWaitNotification(runtime, txId);
TestDescribeResult(
- DescribePath(runtime, serverlessSchemeShard, "/MyRoot/Serverless/Topic2"), {
+ DescribePath(runtime, "/MyRoot/Topic1"), {
NLs::PathExist,
NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
const auto& config = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig();
@@ -6513,18 +6421,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
}
);
- // try to create topic in a non-serverless db
- TestCreatePQGroup(runtime, sharedSchemeShard, ++txId, "/MyRoot/Shared", R"(
- Name: "Topic1"
- TotalGroupCount: 1
- PartitionPerTablet: 1
- PQTabletConfig {
- PartitionConfig { LifetimeSeconds: 10 }
- RequestMeteringMode: METERING_MODE_REQUEST_UNITS
- }
- )", {NKikimrScheme::StatusPreconditionFailed});
-
- TestCreatePQGroup(runtime, sharedSchemeShard, ++txId, "/MyRoot/Shared", R"(
+ TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
Name: "Topic2"
TotalGroupCount: 1
PartitionPerTablet: 1
@@ -6532,10 +6429,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
PartitionConfig { LifetimeSeconds: 10 }
}
)");
- env.TestWaitNotification(runtime, txId, sharedSchemeShard);
+ env.TestWaitNotification(runtime, txId);
TestDescribeResult(
- DescribePath(runtime, sharedSchemeShard, "/MyRoot/Shared/Topic2"), {
+ DescribePath(runtime, "/MyRoot/Topic2"), {
NLs::PathExist,
NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
const auto& config = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig();
@@ -6543,15 +6440,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
}
}
);
-
- // alter topic in a non-serverless db
- TestAlterPQGroup(runtime, sharedSchemeShard, ++txId, "/MyRoot/Shared", R"(
- Name: "Topic2"
- PQTabletConfig {
- PartitionConfig { LifetimeSeconds: 10 }
- RequestMeteringMode: METERING_MODE_RESERVED_CAPACITY
- }
- )", {NKikimrScheme::StatusPreconditionFailed});
}
Y_UNIT_TEST(DropTable) { //+
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
index 40559602f6f..e8f9910670c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
@@ -93,6 +93,10 @@ public:
AnnoyingClient->UpdateDC(name, local, enabled);
}
+ const NYdb::TDriver& GetDriver() const {
+ return CleverServer->GetDriver();
+ }
+
public:
TSimpleSharedPtr<TPortManager> PortManager;
ui16 Port;
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 9ed0870c1a7..1e7be043d8a 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -139,7 +139,9 @@ namespace NKikimr::NDataStreams::V1 {
pqDescr->SetPartitionPerTablet(1);
// TODO: support StreamMode
- pqDescr->MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
+ if (AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
+ pqDescr->MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
+ }
TString error;
auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicSettings, modifyScheme, ctx, false, error,
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index 138dced9e24..4bf336d16a9 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -849,7 +849,40 @@ namespace NKikimr::NGRpcProxy::V1 {
return CheckConfig(*config, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST);
}
+ static bool FillMeteringMode(Ydb::Topic::MeteringMode mode, NKikimrPQ::TPQTabletConfig& config,
+ bool meteringEnabled, bool isAlter, Ydb::StatusIds::StatusCode& code, TString& error)
+ {
+ if (meteringEnabled) {
+ switch (mode) {
+ case Ydb::Topic::METERING_MODE_UNSPECIFIED:
+ if (!isAlter) {
+ config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
+ }
+ break;
+ case Ydb::Topic::METERING_MODE_REQUEST_UNITS:
+ config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
+ break;
+ case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY:
+ config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
+ break;
+ default:
+ code = Ydb::StatusIds::BAD_REQUEST;
+ error = "Unknown metering mode";
+ return false;
+ }
+ } else {
+ switch (mode) {
+ case Ydb::Topic::METERING_MODE_UNSPECIFIED:
+ break;
+ default:
+ code = Ydb::StatusIds::PRECONDITION_FAILED;
+ error = "Metering mode can only be specified in a serverless database";
+ return false;
+ }
+ }
+ return true;
+ }
Ydb::StatusIds::StatusCode FillProposeRequestImpl(
const TString& name, const Ydb::Topic::CreateTopicRequest& request,
@@ -975,19 +1008,9 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- switch (request.metering_mode()) {
- case Ydb::Topic::METERING_MODE_UNSPECIFIED:
- config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
- break;
- case Ydb::Topic::METERING_MODE_REQUEST_UNITS:
- config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
- break;
- case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY:
- config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
- break;
- default:
- error = "Unknown metering mode";
- return Ydb::StatusIds::BAD_REQUEST;
+ Ydb::StatusIds::StatusCode code;
+ if (!FillMeteringMode(request.metering_mode(), *config, pqConfig.GetBillingMeteringConfig().GetEnabled(), false, code, error)) {
+ return code;
}
const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx);
@@ -1003,8 +1026,6 @@ namespace NKikimr::NGRpcProxy::V1 {
return CheckConfig(*config, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST);
}
-
-
Ydb::StatusIds::StatusCode FillProposeRequestImpl(
const Ydb::Topic::AlterTopicRequest& request,
NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx,
@@ -1015,6 +1036,8 @@ namespace NKikimr::NGRpcProxy::V1 {
return Ydb::StatusIds::BAD_REQUEST;\
}
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+
if (request.has_alter_partitioning_settings() && request.alter_partitioning_settings().has_set_min_active_partitions()) {
CHECK_CDC;
auto parts = request.alter_partitioning_settings().set_min_active_partitions();
@@ -1058,7 +1081,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
bool local = true; //todo: check locality
- if (local || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
if (request.has_set_partition_write_speed_bytes_per_second()) {
CHECK_CDC;
auto partSpeed = request.set_partition_write_speed_bytes_per_second();
@@ -1099,18 +1122,9 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- switch (request.set_metering_mode()) {
- case Ydb::Topic::METERING_MODE_UNSPECIFIED:
- break; // do not change
- case Ydb::Topic::METERING_MODE_REQUEST_UNITS:
- config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
- break;
- case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY:
- config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
- break;
- default:
- error = "Unknown metering mode";
- return Ydb::StatusIds::BAD_REQUEST;
+ Ydb::StatusIds::StatusCode code;
+ if (!FillMeteringMode(request.set_metering_mode(), *config, pqConfig.GetBillingMeteringConfig().GetEnabled(), true, code, error)) {
+ return code;
}
const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx);
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 3bee991d2eb..fe23b799d07 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -508,7 +508,9 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
(*result.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000);
(*result.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts();
- if (local || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+
+ if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
result.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond());
result.set_partition_write_burst_bytes(partConfig.GetBurstSize());
}
@@ -517,7 +519,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
result.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1));
}
- if (response.DomainInfo && response.DomainInfo->IsServerless()) {
+ if (pqConfig.GetBillingMeteringConfig().GetEnabled()) {
switch (config.GetMeteringMode()) {
case NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY:
result.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY);
@@ -530,7 +532,6 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
}
}
- const auto& pqConfig = AppData(ctx)->PQConfig;
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
auto rr = result.add_consumers();
auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 8c6b26b34b8..2e8dd7931f0 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -3615,7 +3615,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
ReadRuleVersions: 567
TopicPath: "/Root/PQ/rt3.dc1--acc--topic3"
YdbDatabasePath: "/Root"
- MeteringMode: METERING_MODE_REQUEST_UNITS
}
ErrorCode: OK
}
@@ -4737,6 +4736,98 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
TestReadRuleServiceTypePasswordImpl(true);
}
+ void CreateTopicWithMeteringMode(bool meteringEnabled) {
+ TServerSettings serverSettings = PQSettings(0);
+ serverSettings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ serverSettings.PQConfig.MutableBillingMeteringConfig()->SetEnabled(meteringEnabled);
+ NPersQueue::TTestServer server(serverSettings);
+
+ using namespace NYdb::NTopic;
+ auto client = TTopicClient(server.GetDriver());
+
+ for (const auto mode : {EMeteringMode::RequestUnits, EMeteringMode::ReservedCapacity}) {
+ const TString path = TStringBuilder() << "/Root/PQ/Topic" << mode;
+
+ auto res = client.CreateTopic(path, TCreateTopicSettings()
+ .MeteringMode(mode)
+ ).ExtractValueSync();
+
+ if (!meteringEnabled) {
+ UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED);
+ continue;
+ }
+
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ auto desc = client.DescribeTopic(path).ExtractValueSync();
+ UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetMeteringMode(), mode);
+ }
+ }
+
+ Y_UNIT_TEST(CreateTopicWithMeteringMode) {
+ CreateTopicWithMeteringMode(false);
+ CreateTopicWithMeteringMode(true);
+ }
+
+ void SetMeteringMode(bool meteringEnabled) {
+ TServerSettings serverSettings = PQSettings(0);
+ serverSettings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ serverSettings.PQConfig.MutableBillingMeteringConfig()->SetEnabled(meteringEnabled);
+ NPersQueue::TTestServer server(serverSettings);
+
+ using namespace NYdb::NTopic;
+ auto client = TTopicClient(server.GetDriver());
+
+ {
+ auto res = client.CreateTopic("/Root/PQ/ttt").ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+
+ for (const auto mode : {EMeteringMode::RequestUnits, EMeteringMode::ReservedCapacity}) {
+ auto res = client.AlterTopic("/Root/PQ/ttt", TAlterTopicSettings()
+ .SetMeteringMode(mode)
+ ).ExtractValueSync();
+
+ if (!meteringEnabled) {
+ UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED);
+ continue;
+ }
+
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ auto desc = client.DescribeTopic("/Root/PQ/ttt").ExtractValueSync();
+ UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetMeteringMode(), mode);
+ }
+ }
+
+ Y_UNIT_TEST(SetMeteringMode) {
+ SetMeteringMode(false);
+ SetMeteringMode(true);
+ }
+
+ void DefaultMeteringMode(bool meteringEnabled) {
+ TServerSettings serverSettings = PQSettings(0);
+ serverSettings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ serverSettings.PQConfig.MutableBillingMeteringConfig()->SetEnabled(meteringEnabled);
+ NPersQueue::TTestServer server(serverSettings);
+
+ using namespace NYdb::NTopic;
+ auto client = TTopicClient(server.GetDriver());
+
+ auto res = client.CreateTopic("/Root/PQ/ttt").ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+
+ auto desc = client.DescribeTopic("/Root/PQ/ttt").ExtractValueSync();
+ UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetMeteringMode(), (meteringEnabled
+ ? EMeteringMode::RequestUnits
+ : EMeteringMode::Unspecified));
+ }
+
+ Y_UNIT_TEST(DefaultMeteringMode) {
+ DefaultMeteringMode(false);
+ DefaultMeteringMode(true);
+ }
Y_UNIT_TEST(TClusterTrackerTest) {
APITestSetup setup{TEST_CASE_NAME};