aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-09-01 21:44:08 +0300
committerilnaz <ilnaz@ydb.tech>2022-09-01 21:44:08 +0300
commit121300378e97ff20cb747e463dca98607d595e7e (patch)
treed69c5239a7664fd6dd63fbc661212197061fdf00
parentf64eb8ed70d8ec6b2761cd046be5a9ecfce78d07 (diff)
downloadydb-121300378e97ff20cb747e463dca98607d595e7e.tar.gz
Default metering mode for Datastreams api
-rw-r--r--ydb/core/protos/pqconfig.proto3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp30
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp13
-rw-r--r--ydb/core/tx/schemeshard/ut_base.cpp9
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp2
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp11
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp1
8 files changed, 43 insertions, 32 deletions
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index fff180ae2c6..556cf19a72b 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -309,7 +309,10 @@ message TPQTabletConfig {
METERING_MODE_RESERVED_CAPACITY = 0;
METERING_MODE_REQUEST_UNITS = 1;
}
+ // Metering mode used in config
optional EMeteringMode MeteringMode = 34;
+ // Metering mode passed in the request
+ optional EMeteringMode RequestMeteringMode = 35;
}
message TMessageGroupInfo {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
index b4d35ca64a4..1eaa8cb3f63 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
@@ -75,10 +75,10 @@ public:
}
TPersQueueGroupInfo::TPtr ParseParams(
- TOperationContext& context,
+ TOperationContext& context, bool isServerlessDomain,
NKikimrPQ::TPQTabletConfig* tabletConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& alter,
- TString& errStr)
+ TEvSchemeShard::EStatus& status, TString& errStr)
{
TPersQueueGroupInfo::TPtr params = new TPersQueueGroupInfo();
const bool hasKeySchema = tabletConfig->PartitionKeySchemaSize();
@@ -113,6 +113,17 @@ public:
return nullptr;
}
+ if (alterConfig.HasRequestMeteringMode()) {
+ if (!isServerlessDomain) {
+ status = NKikimrScheme::StatusPreconditionFailed;
+ errStr = "Metering mode can only be specified in a serverless domain";
+ return nullptr;
+ } else {
+ alterConfig.SetMeteringMode(alterConfig.GetRequestMeteringMode());
+ alterConfig.ClearRequestMeteringMode();
+ }
+ }
+
if (alterConfig.GetPartitionConfig().ExplicitChannelProfilesSize() > 0) {
// Validate explicit channel profiles alter attempt
const auto& ecps = alterConfig.GetPartitionConfig().GetExplicitChannelProfiles();
@@ -455,9 +466,13 @@ public:
}
newTabletConfig = tabletConfig;
- TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);
+ const auto domainPath = TPath::Init(path.GetPathIdForDomain(), context.SS);
+ TEvSchemeShard::EStatus parseStatus = NKikimrScheme::StatusInvalidParameter;
+ TPersQueueGroupInfo::TPtr alterData = ParseParams(
+ context, context.SS->IsServerlessDomain(domainPath), &newTabletConfig, alter, parseStatus, errStr);
+
if (!alterData) {
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
+ result->SetError(parseStatus, errStr);
return result;
}
@@ -539,13 +554,6 @@ public:
return result;
}
- const auto domainPath = TPath::Init(path.GetPathIdForDomain(), context.SS);
- if (newTabletConfig.HasMeteringMode() && !context.SS->IsServerlessDomain(domainPath)) {
- errStr = "Metering mode can only be specified for topic in serverless domain";
- result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
- return result;
- }
-
auto getStorageLimit = [](auto &config, ui64 throughput) {
if (config.GetPartitionConfig().HasStorageLimitBytes()) {
return config.GetPartitionConfig().GetStorageLimitBytes();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index d267a722e6a..ae1bab311e8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -663,11 +663,7 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx
auto& pqConfig = *desc.MutablePQTabletConfig();
pqConfig.SetTopicName(streamName);
pqConfig.SetTopicPath(streamPath.Child("streamImpl").PathString());
-
- const auto domainPath = TPath::Init(tablePath.GetPathIdForDomain(), context.SS);
- if (context.SS->IsServerlessDomain(domainPath)) {
- pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
- }
+ pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
auto& partitionConfig = *pqConfig.MutablePartitionConfig();
partitionConfig.SetLifetimeSeconds(retentionPeriod.Seconds());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
index 43ecff4d1ca..613b6da6e45 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
@@ -147,15 +147,14 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, bool
return nullptr;
}
- if (isServerlessDomain) {
- if (!tabletConfig.HasMeteringMode()) {
- tabletConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
- }
- } else {
- if (tabletConfig.HasMeteringMode()) {
+ if (tabletConfig.HasRequestMeteringMode()) {
+ if (!isServerlessDomain) {
status = NKikimrScheme::StatusPreconditionFailed;
- errStr = "Metering mode can only be specified for topic in serverless domain";
+ errStr = "Metering mode can only be specified in a serverless domain";
return nullptr;
+ } else {
+ tabletConfig.SetMeteringMode(tabletConfig.GetRequestMeteringMode());
+ tabletConfig.ClearRequestMeteringMode();
}
}
diff --git a/ydb/core/tx/schemeshard/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base.cpp
index c72e387c22e..f298bd1c7b8 100644
--- a/ydb/core/tx/schemeshard/ut_base.cpp
+++ b/ydb/core/tx/schemeshard/ut_base.cpp
@@ -6454,7 +6454,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
PartitionPerTablet: 1
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
- MeteringMode: METERING_MODE_REQUEST_UNITS
+ RequestMeteringMode: METERING_MODE_REQUEST_UNITS
}
)");
env.TestWaitNotification(runtime, txId, serverlessSchemeShard);
@@ -6476,6 +6476,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
PartitionPerTablet: 1
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
+ MeteringMode: METERING_MODE_REQUEST_UNITS
}
)");
env.TestWaitNotification(runtime, txId, serverlessSchemeShard);
@@ -6496,7 +6497,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
Name: "Topic2"
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
- MeteringMode: METERING_MODE_RESERVED_CAPACITY
+ RequestMeteringMode: METERING_MODE_RESERVED_CAPACITY
}
)");
env.TestWaitNotification(runtime, txId, serverlessSchemeShard);
@@ -6519,7 +6520,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
PartitionPerTablet: 1
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
- MeteringMode: METERING_MODE_REQUEST_UNITS
+ RequestMeteringMode: METERING_MODE_REQUEST_UNITS
}
)", {NKikimrScheme::StatusPreconditionFailed});
@@ -6548,7 +6549,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
Name: "Topic2"
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
- MeteringMode: METERING_MODE_RESERVED_CAPACITY
+ RequestMeteringMode: METERING_MODE_RESERVED_CAPACITY
}
)", {NKikimrScheme::StatusPreconditionFailed});
}
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 32a67bc12ef..9ed0870c1a7 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -138,6 +138,8 @@ namespace NKikimr::NDataStreams::V1 {
modifyScheme.SetWorkingDir(workingDir);
pqDescr->SetPartitionPerTablet(1);
+ // TODO: support StreamMode
+ pqDescr->MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
TString error;
auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicSettings, modifyScheme, ctx, false, error,
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index 3ddf9c5d030..138dced9e24 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -977,12 +977,13 @@ namespace NKikimr::NGRpcProxy::V1 {
switch (request.metering_mode()) {
case Ydb::Topic::METERING_MODE_UNSPECIFIED:
- break; // schemeshard will set the default value if necessary
- case Ydb::Topic::METERING_MODE_REQUEST_UNITS:
config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
break;
+ case Ydb::Topic::METERING_MODE_REQUEST_UNITS:
+ config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
+ break;
case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY:
- config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
+ config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
break;
default:
error = "Unknown metering mode";
@@ -1102,10 +1103,10 @@ namespace NKikimr::NGRpcProxy::V1 {
case Ydb::Topic::METERING_MODE_UNSPECIFIED:
break; // do not change
case Ydb::Topic::METERING_MODE_REQUEST_UNITS:
- config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
+ config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
break;
case Ydb::Topic::METERING_MODE_RESERVED_CAPACITY:
- config->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
+ config->SetRequestMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
break;
default:
error = "Unknown metering mode";
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 0604c677490..8c6b26b34b8 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -3615,6 +3615,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
ReadRuleVersions: 567
TopicPath: "/Root/PQ/rt3.dc1--acc--topic3"
YdbDatabasePath: "/Root"
+ MeteringMode: METERING_MODE_REQUEST_UNITS
}
ErrorCode: OK
}