aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-11-23 13:40:00 +0300
committerilnaz <ilnaz@ydb.tech>2022-11-23 13:40:00 +0300
commitd2cba3927c297cd7e6af4cbbc6e581ec29ed066f (patch)
treeb583d9687eed4454893788b479c4b037c607fe09
parent5f627a5e2c2917fd26b1fc318b9d8a8918c8fc74 (diff)
downloadydb-d2cba3927c297cd7e6af4cbbc6e581ec29ed066f.tar.gz
(refactoring) Move retention_period checks to SchemeShard
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp32
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp16
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp21
-rw-r--r--ydb/tests/library/common/protobuf_ss.py3
4 files changed, 31 insertions, 41 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
index e89a4de7f99..67803c7740c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
@@ -88,6 +88,19 @@ public:
return nullptr;
}
+ if (alterConfig.GetPartitionConfig().HasLifetimeSeconds()) {
+ const auto lifetimeSeconds = alterConfig.GetPartitionConfig().GetLifetimeSeconds();
+ if (lifetimeSeconds <= 0 || (ui32)lifetimeSeconds > TSchemeShard::MaxPQLifetimeSeconds) {
+ errStr = TStringBuilder() << "Invalid retention period"
+ << ": specified: " << lifetimeSeconds << "s"
+ << ", min: " << 1 << "s"
+ << ", max: " << TSchemeShard::MaxPQLifetimeSeconds << "s";
+ return nullptr;
+ }
+ } else {
+ alterConfig.MutablePartitionConfig()->SetLifetimeSeconds(tabletConfig->GetPartitionConfig().GetLifetimeSeconds());
+ }
+
if (alterConfig.GetPartitionConfig().ExplicitChannelProfilesSize() > 0) {
// Validate explicit channel profiles alter attempt
const auto& ecps = alterConfig.GetPartitionConfig().GetExplicitChannelProfiles();
@@ -496,19 +509,13 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
}
- if ((ui32)newTabletConfig.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() > TSchemeShard::MaxPQWriteSpeedPerPartition) {
- errStr = TStringBuilder()
- << "Invalid write speed per second in partition specified: " << newTabletConfig.GetPartitionConfig().GetWriteSpeedInBytesPerSecond()
- << " vs " << TSchemeShard::MaxPQWriteSpeedPerPartition;
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
- return result;
- }
- if ((ui32)newTabletConfig.GetPartitionConfig().GetLifetimeSeconds() > TSchemeShard::MaxPQLifetimeSeconds) {
- errStr = TStringBuilder()
- << "Invalid retention period specified: " << newTabletConfig.GetPartitionConfig().GetLifetimeSeconds()
- << " vs " << TSchemeShard::MaxPQLifetimeSeconds;
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
+ const auto& partConfig = newTabletConfig.GetPartitionConfig();
+
+ if ((ui32)partConfig.GetWriteSpeedInBytesPerSecond() > TSchemeShard::MaxPQWriteSpeedPerPartition) {
+ result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() << "Invalid write speed"
+ << ": specified: " << partConfig.GetWriteSpeedInBytesPerSecond() << "bps"
+ << ", max: " << TSchemeShard::MaxPQWriteSpeedPerPartition << "bps");
return result;
}
@@ -562,7 +569,6 @@ public:
// This channel bindings are for PersQueue shards. They either use
// explicit channel profiles, or reuse channel profile above.
- const auto& partConfig = newTabletConfig.GetPartitionConfig();
TChannelsBindings pqChannelsBinding;
if (partConfig.ExplicitChannelProfilesSize() > 0) {
// N.B. no validation necessary at this step
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
index 8646ab3437e..b0a3bab874e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
@@ -53,17 +53,19 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
if ((ui32)op.GetPQTabletConfig().GetPartitionConfig().GetWriteSpeedInBytesPerSecond() > TSchemeShard::MaxPQWriteSpeedPerPartition) {
status = NKikimrScheme::StatusInvalidParameter;
- errStr = TStringBuilder()
- << "Invalid write speed per second in partition specified: " << op.GetPQTabletConfig().GetPartitionConfig().GetWriteSpeedInBytesPerSecond()
- << " vs " << TSchemeShard::MaxPQWriteSpeedPerPartition;
+ errStr = TStringBuilder() << "Invalid write speed"
+ << ": specified: " << op.GetPQTabletConfig().GetPartitionConfig().GetWriteSpeedInBytesPerSecond() << "bps"
+ << ", max: " << TSchemeShard::MaxPQWriteSpeedPerPartition << "bps";
return nullptr;
}
- if ((ui32)op.GetPQTabletConfig().GetPartitionConfig().GetLifetimeSeconds() > TSchemeShard::MaxPQLifetimeSeconds) {
+ const auto lifetimeSeconds = op.GetPQTabletConfig().GetPartitionConfig().GetLifetimeSeconds();
+ if (lifetimeSeconds <= 0 || (ui32)lifetimeSeconds > TSchemeShard::MaxPQLifetimeSeconds) {
status = NKikimrScheme::StatusInvalidParameter;
- errStr = TStringBuilder()
- << "Invalid retention period specified: " << op.GetPQTabletConfig().GetPartitionConfig().GetLifetimeSeconds()
- << " vs " << TSchemeShard::MaxPQLifetimeSeconds;
+ errStr = TStringBuilder() << "Invalid retention period"
+ << ": specified: " << lifetimeSeconds << "s"
+ << ", min: " << 1 << "s"
+ << ", max: " << TSchemeShard::MaxPQLifetimeSeconds << "s";
return nullptr;
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index 5934f0aa0b3..f080b3f99a2 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -638,11 +638,6 @@ namespace NKikimr::NGRpcProxy::V1 {
switch (settings.retention_case()) {
case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: {
- if (settings.retention_period_ms() <= 0) {
- error = TStringBuilder() << "retention_period_ms must be positive, provided " <<
- settings.retention_period_ms();
- return Ydb::StatusIds::BAD_REQUEST;
- }
partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll));
}
break;
@@ -956,11 +951,6 @@ namespace NKikimr::NGRpcProxy::V1 {
partConfig->MutableExplicitChannelProfiles()->CopyFrom(channelProfiles);
}
if (request.has_retention_period()) {
- if (request.retention_period().seconds() <= 0) {
- error = TStringBuilder() << "retention_period must be not negative, provided " <<
- request.retention_period().DebugString();
- return Ydb::StatusIds::BAD_REQUEST;
- }
partConfig->SetLifetimeSeconds(request.retention_period().seconds());
} else {
partConfig->SetLifetimeSeconds(TDuration::Days(1).Seconds());
@@ -1060,16 +1050,7 @@ namespace NKikimr::NGRpcProxy::V1 {
if (request.has_set_retention_period()) {
CHECK_CDC;
- if (request.set_retention_period().seconds() < 0) {
- error = TStringBuilder() << "retention_period must be not negative, provided " <<
- request.set_retention_period().DebugString();
- return Ydb::StatusIds::BAD_REQUEST;
- }
- if (request.set_retention_period().seconds() > 0) {
- partConfig->SetLifetimeSeconds(request.set_retention_period().seconds());
- } else {
- partConfig->SetLifetimeSeconds(TDuration::Days(1).Seconds());
- }
+ partConfig->SetLifetimeSeconds(request.set_retention_period().seconds());
}
diff --git a/ydb/tests/library/common/protobuf_ss.py b/ydb/tests/library/common/protobuf_ss.py
index 90b0638eb41..ae905bad459 100644
--- a/ydb/tests/library/common/protobuf_ss.py
+++ b/ydb/tests/library/common/protobuf_ss.py
@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import itertools
import string
+from datetime import timedelta
from os.path import basename, dirname, join
from ydb.core.protos import msgbus_pb2
@@ -330,7 +331,7 @@ class CreateTopicRequest(AbstractTSchemeOperationRequest):
self.__important_client_ids = None
self.__max_count_in_partition = None
self.__max_size_in_partition = None
- self.__lifetime_seconds = 0
+ self.__lifetime_seconds = int(timedelta(days=1).total_seconds())
@property
def partitions_count(self):