diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-09-01 21:44:08 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-09-01 21:44:08 +0300 |
commit | 121300378e97ff20cb747e463dca98607d595e7e (patch) | |
tree | d69c5239a7664fd6dd63fbc661212197061fdf00 | |
parent | f64eb8ed70d8ec6b2761cd046be5a9ecfce78d07 (diff) | |
download | ydb-121300378e97ff20cb747e463dca98607d595e7e.tar.gz |
Default metering mode for Datastreams api
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 3 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp | 30 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_base.cpp | 9 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 2 | ||||
-rw-r--r-- | ydb/services/lib/actors/pq_schema_actor.cpp | 11 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 1 |
8 files changed, 43 insertions, 32 deletions
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index fff180ae2c6..556cf19a72b 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -309,7 +309,10 @@ message TPQTabletConfig { METERING_MODE_RESERVED_CAPACITY = 0; METERING_MODE_REQUEST_UNITS = 1; } + // Metering mode used in config optional EMeteringMode MeteringMode = 34; + // Metering mode passed in the request + optional EMeteringMode RequestMeteringMode = 35; } message TMessageGroupInfo { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index b4d35ca64a4..1eaa8cb3f63 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -75,10 +75,10 @@ public: } TPersQueueGroupInfo::TPtr ParseParams( - TOperationContext& context, + TOperationContext& context, bool isServerlessDomain, NKikimrPQ::TPQTabletConfig* tabletConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& alter, - TString& errStr) + TEvSchemeShard::EStatus& status, TString& errStr) { TPersQueueGroupInfo::TPtr params = new TPersQueueGroupInfo(); const bool hasKeySchema = tabletConfig->PartitionKeySchemaSize(); @@ -113,6 +113,17 @@ public: return nullptr; } + if (alterConfig.HasRequestMeteringMode()) { + if (!isServerlessDomain) { + status = NKikimrScheme::StatusPreconditionFailed; + errStr = "Metering mode can only be specified in a serverless domain"; + return nullptr; + } else { + alterConfig.SetMeteringMode(alterConfig.GetRequestMeteringMode()); + alterConfig.ClearRequestMeteringMode(); + } + } + if (alterConfig.GetPartitionConfig().ExplicitChannelProfilesSize() > 0) { // Validate explicit channel profiles alter attempt const auto& ecps = alterConfig.GetPartitionConfig().GetExplicitChannelProfiles(); @@ -455,9 +466,13 @@ public: } newTabletConfig = tabletConfig; - TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr); + const auto domainPath = TPath::Init(path.GetPathIdForDomain(), context.SS); + TEvSchemeShard::EStatus parseStatus = NKikimrScheme::StatusInvalidParameter; + TPersQueueGroupInfo::TPtr alterData = ParseParams( + context, context.SS->IsServerlessDomain(domainPath), &newTabletConfig, alter, parseStatus, errStr); + if (!alterData) { - result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); + result->SetError(parseStatus, errStr); return result; } @@ -539,13 +554,6 @@ public: return result; } - const auto domainPath = TPath::Init(path.GetPathIdForDomain(), context.SS); - if (newTabletConfig.HasMeteringMode() && !context.SS->IsServerlessDomain(domainPath)) { - errStr = "Metering mode can only be specified for topic in serverless domain"; - result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); - return result; - } - auto getStorageLimit = [](auto &config, ui64 throughput) { if (config.GetPartitionConfig().HasStorageLimitBytes()) { return config.GetPartitionConfig().GetStorageLimitBytes(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index d267a722e6a..ae1bab311e8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -663,11 +663,7 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx auto& pqConfig = *desc.MutablePQTabletConfig(); pqConfig.SetTopicName(streamName); pqConfig.SetTopicPath(streamPath.Child("streamImpl").PathString()); - - const auto domainPath = TPath::Init(tablePath.GetPathIdForDomain(), context.SS); - if (context.SS->IsServerlessDomain(domainPath)) { - pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); - } + pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); auto& partitionConfig = *pqConfig.MutablePartitionConfig(); partitionConfig.SetLifetimeSeconds(retentionPeriod.Seconds()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index 43ecff4d1ca..613b6da6e45 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -147,15 +147,14 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, bool return nullptr; } - if (isServerlessDomain) { - if (!tabletConfig.HasMeteringMode()) { - tabletConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); - } - } else { - if (tabletConfig.HasMeteringMode()) { + if (tabletConfig.HasRequestMeteringMode()) { + if (!isServerlessDomain) { status = NKikimrScheme::StatusPreconditionFailed; - errStr = "Metering mode can only be specified for topic in serverless domain"; + errStr = "Metering mode can only be specified in a serverless domain"; return nullptr; + } else { + tabletConfig.SetMeteringMode(tabletConfig.GetRequestMeteringMode()); + tabletConfig.ClearRequestMeteringMode(); } } diff --git a/ydb/core/tx/schemeshard/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base.cpp index c72e387c22e..f298bd1c7b8 100644 --- a/ydb/core/tx/schemeshard/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base.cpp @@ -6454,7 +6454,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { PartitionPerTablet: 1 PQTabletConfig { PartitionConfig { LifetimeSeconds: 10 } - MeteringMode: METERING_MODE_REQUEST_UNITS + RequestMeteringMode: METERING_MODE_REQUEST_UNITS } )"); env.TestWaitNotification(runtime, txId, serverlessSchemeShard); @@ -6476,6 +6476,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { PartitionPerTablet: 1 PQTabletConfig { PartitionConfig { LifetimeSeconds: 10 } + MeteringMode: METERING_MODE_REQUEST_UNITS } )"); env.TestWaitNotification(runtime, txId, serverlessSchemeShard); @@ -6496,7 +6497,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { Name: "Topic2" PQTabletConfig { PartitionConfig { LifetimeSeconds: 10 } - MeteringMode: METERING_MODE_RESERVED_CAPACITY + RequestMeteringMode: METERING_MODE_RESERVED_CAPACITY } )"); env.TestWaitNotification(runtime, txId, serverlessSchemeShard); @@ -6519,7 +6520,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { PartitionPerTablet: 1 PQTabletConfig { PartitionConfig { LifetimeSeconds: 10 } - MeteringMode: METERING_MODE_REQUEST_UNITS + RequestMeteringMode: METERING_MODE_REQUEST_UNITS } )", {NKikimrScheme::StatusPreconditionFailed}); @@ -6548,7 +6549,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { Name: "Topic2" PQTabletConfig { PartitionConfig { LifetimeSeconds: 10 } - MeteringMode: METERING_MODE_RESERVED_CAPACITY + RequestMeteringMode: METERING_MODE_RESERVED_CAPACITY } )", {NKikimrScheme::StatusPreconditionFailed}); } diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 32a67bc12ef..9ed0870c1a7 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -138,6 +138,8 @@ namespace NKikimr::NDataStreams::V1 { modifyScheme.SetWorkingDir(workingDir); pqDescr->SetPartitionPerTablet(1); + // TODO: support StreamMode + pqDescr->MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); TString error; auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicSettings, modifyScheme, ctx, false, error, diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 3ddf9c5d030..138dced9e24 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -977,12 +977,13 @@ namespace NKikimr::NGRpcProxy::V1 { switch (request.metering_mode()) { case Ydb::Topic::METERING_MODE_UNSPECIFIED: - break; // schemeshard will set the default value if necessary - case Ydb::Topic::METERING_MODE_REQUEST_UNITS: config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); break; + case Ydb::Topic::METERING_MODE_REQUEST_UNITS: + config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + break; case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY: - config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); + config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); break; default: error = "Unknown metering mode"; @@ -1102,10 +1103,10 @@ namespace NKikimr::NGRpcProxy::V1 { case Ydb::Topic::METERING_MODE_UNSPECIFIED: break; // do not change case Ydb::Topic::METERING_MODE_REQUEST_UNITS: - config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); break; case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY: - config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); + config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); break; default: error = "Unknown metering mode"; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 0604c677490..8c6b26b34b8 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -3615,6 +3615,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { ReadRuleVersions: 567 TopicPath: "/Root/PQ/rt3.dc1--acc--topic3" YdbDatabasePath: "/Root" + MeteringMode: METERING_MODE_REQUEST_UNITS } ErrorCode: OK } |