diff options
author | alexbogo <alexbogo@ydb.tech> | 2022-10-21 17:52:53 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2022-10-21 17:52:53 +0300 |
commit | 54c2e75b23f012b13d7e89a41a6042c60e105681 (patch) | |
tree | 0cad063bc11d09dc2ce56e8deb6d4322de9bb64c | |
parent | ebcdbbcef81f8477ceb78a552f8b171b987f3e40 (diff) | |
download | ydb-54c2e75b23f012b13d7e89a41a6042c60e105681.tar.gz |
[ymq] add fields CreatedTimestamp and CustomQueueName for private CreateQueueRequest
add_file
init
-rw-r--r-- | ydb/core/protos/sqs.proto | 5 | ||||
-rw-r--r-- | ydb/core/ymq/actor/create_queue.cpp | 35 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_schema.cpp | 9 | ||||
-rw-r--r-- | ydb/core/ymq/http/http.cpp | 21 | ||||
-rw-r--r-- | ydb/core/ymq/http/params.h | 3 | ||||
-rw-r--r-- | ydb/core/ymq/http/parser.rl6 | 2 | ||||
-rw-r--r-- | ydb/tests/library/sqs/requests_client.py | 8 |
7 files changed, 73 insertions, 10 deletions
diff --git a/ydb/core/protos/sqs.proto b/ydb/core/protos/sqs.proto index eae557a624..45dee72b1a 100644 --- a/ydb/core/protos/sqs.proto +++ b/ydb/core/protos/sqs.proto @@ -92,6 +92,11 @@ message TCreateQueueRequest { optional uint64 SizeToSplit = 9 [default = 1073741824]; // 1GB /// Enable internal tables' transactions out of order execution. optional bool EnableOutOfOrderTransactionsExecution = 6 [default = true]; + + /// Custom name of queue. + optional string CustomQueueName = 10; + /// Created time of queue + optional uint64 CreatedTimestamp = 11; } message TCreateQueueResponse { diff --git a/ydb/core/ymq/actor/create_queue.cpp b/ydb/core/ymq/actor/create_queue.cpp index 2d4b3af214..c5311ffd34 100644 --- a/ydb/core/ymq/actor/create_queue.cpp +++ b/ydb/core/ymq/actor/create_queue.cpp @@ -34,7 +34,8 @@ public: } protected: bool IsFifoQueue() const override { - return AsciiHasSuffixIgnoreCase(Request().GetQueueName(), ".fifo"); // works for cloud too, since the custom name should end with '.fifo' + const TString& name = Request().GetCustomQueueName() ? Request().GetCustomQueueName() : Request().GetQueueName(); + return AsciiHasSuffixIgnoreCase(name, ".fifo"); // works for cloud too, since the custom name should end with '.fifo' } private: @@ -76,6 +77,25 @@ private: return false; } + if (Request().HasCustomQueueName()) { + if (IsCloud()) { + if (!ValidateQueueNameOrUserName(Request().GetCustomQueueName())) { + MakeError(result, NErrors::INVALID_PARAMETER_VALUE, "Invalid custom queue name."); + return false; + } + } else { + if (!Request().GetCustomQueueName().empty()) { + MakeError(result, NErrors::INVALID_PARAMETER_VALUE, "Custom queue name must be empty or unset."); + return false; + } + } + } + + if (Request().HasCreatedTimestamp() && Request().GetCreatedTimestamp() > TActivationContext::AsActorContext().Now().Seconds()) { + MakeError(result, NErrors::INVALID_PARAMETER_VALUE, "Invalid created timestamp."); + return false; + } + if (Request().GetShards() > MAX_SHARDS_COUNT) { MakeError(result, NErrors::INVALID_PARAMETER_VALUE, "Too many shards."); return false; @@ -111,11 +131,14 @@ private: Become(&TThis::StateFunc); if (IsCloud()) { - Register(new TAtomicCounterActor(SelfId(), Cfg().GetRoot(), RequestId_)); + if (Request().GetCustomQueueName()) { + ResourceId_ = Request().GetQueueName(); + StartQueueCreation(Request().GetQueueName(), UserName_, Request().GetCustomQueueName()); + } else { + Register(new TAtomicCounterActor(SelfId(), Cfg().GetRoot(), RequestId_)); + } } else { - static const TString emptyCustomQueueName = ""; - - StartQueueCreation(Request().GetQueueName(), UserName_, emptyCustomQueueName); + StartQueueCreation(Request().GetQueueName(), UserName_, Request().GetCustomQueueName()); } } @@ -167,7 +190,7 @@ private: case EQueueState::Active: if (event->Success) { - const TString& name = Request().GetQueueName(); + const TString& name = Request().GetCustomQueueName() ? Request().GetCustomQueueName() : Request().GetQueueName(); if (IsCloud()) { const auto finalResourceId = event->AlreadyExists ? event->ExistingQueueResourceId : ResourceId_; result->SetQueueName(finalResourceId); diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp index 92621a4f27..b25b7bde05 100644 --- a/ydb/core/ymq/actor/queue_schema.cpp +++ b/ydb/core/ymq/actor/queue_schema.cpp @@ -713,6 +713,7 @@ static const char* const CommitQueueParamsQuery = R"__( (let fifo (Parameter 'FIFO (DataType 'Bool))) (let contentBasedDeduplication (Parameter 'CONTENT_BASED_DEDUPLICATION (DataType 'Bool))) (let now (Parameter 'NOW (DataType 'Uint64))) + (let createdTimestamp (Parameter 'CREATED_TIMESTAMP (DataType 'Uint64))) (let shards (Parameter 'SHARDS (DataType 'Uint64))) (let partitions (Parameter 'PARTITIONS (DataType 'Uint64))) (let masterTabletId (Parameter 'MASTER_TABLET_ID (DataType 'Uint64))) @@ -823,7 +824,7 @@ static const char* const CommitQueueParamsQuery = R"__( '('QueueState (Uint64 '3)) '('FifoQueue fifo) '('DeadLetterQueue (Bool 'false)) - '('CreatedTimestamp now) + '('CreatedTimestamp createdTimestamp) '('Shards shards) '('Partitions partitions) '('Version queueIdNumber) @@ -857,7 +858,7 @@ static const char* const CommitQueueParamsQuery = R"__( (let stateUpdate '( '('CleanupTimestamp now) - '('CreatedTimestamp now) + '('CreatedTimestamp createdTimestamp) '('LastModifiedTimestamp now) '('InflyCount (Int64 '0)) '('MessageCount (Int64 '0)) @@ -891,7 +892,7 @@ static const char* const CommitQueueParamsQuery = R"__( (let row '(%5$s)) (let update '( '('CleanupTimestamp now) - '('CreatedTimestamp now) + '('CreatedTimestamp createdTimestamp) '('LastModifiedTimestamp now) '('InflyCount (Int64 '0)) '('MessageCount (Int64 '0)) @@ -962,6 +963,7 @@ void TCreateQueueSchemaActorV2::CommitNewVersion() { auto ev = MakeExecuteEvent(query); auto* trans = ev->Record.MutableTransaction()->MutableMiniKQLTransaction(); Y_VERIFY(TablesFormat_ == 1 || LeaderTabletId_ != 0); + TInstant createdTimestamp = Request_.HasCreatedTimestamp() ? TInstant::Seconds(Request_.GetCreatedTimestamp()) : QueueCreationTimestamp_; TParameters(trans->MutableParams()->MutableProto()) .Utf8("NAME", QueuePath_.QueueName) .Utf8("CUSTOMNAME", CustomQueueName_) @@ -970,6 +972,7 @@ void TCreateQueueSchemaActorV2::CommitNewVersion() { .Bool("FIFO", IsFifo_) .Bool("CONTENT_BASED_DEDUPLICATION", *ValidatedAttributes_.ContentBasedDeduplication) .Uint64("NOW", QueueCreationTimestamp_.MilliSeconds()) + .Uint64("CREATED_TIMESTAMP", createdTimestamp.MilliSeconds()) .Uint64("SHARDS", RequiredShardsCount_) .Uint64("PARTITIONS", Request_.GetPartitions()) .Uint64("MASTER_TABLET_ID", LeaderTabletId_) diff --git a/ydb/core/ymq/http/http.cpp b/ydb/core/ymq/http/http.cpp index 206ec6de56..d9c220f9aa 100644 --- a/ydb/core/ymq/http/http.cpp +++ b/ydb/core/ymq/http/http.cpp @@ -486,10 +486,24 @@ void THttpRequest::ParseRequest(THttpInput& input) { break; \ } +bool HasPrivateActionParams(EAction action, const TParameters& params) { + if (action == EAction::CreateQueue) { + return params.CreateTimestampSeconds || params.CustomQueueName; + } + return false; +} + bool THttpRequest::SetupRequest() { auto requestHolder = MakeHolder<TSqsRequest>(); requestHolder->SetRequestId(RequestId_); + if (HasPrivateActionParams(Action_, QueryParams_) && !IsPrivateRequest_) { + RLOG_SQS_BASE_ERROR( + *Parent_->ActorSystem_, + "Attempt to call private " << Action_ << " action format without private url path" + ); + throw TSQSException(NErrors::INVALID_ACTION); + } // Validate batches if (IsBatchAction(Action_)) { if (QueryParams_.BatchEntries.empty()) { @@ -608,6 +622,13 @@ void THttpRequest::SetupCreateQueue(TCreateQueueRequest* const req) { req->SetQueueName(QueueName_); req->MutableAuth()->SetUserName(UserName_); + if (QueryParams_.CreateTimestampSeconds) { + req->SetCreatedTimestamp(QueryParams_.CreateTimestampSeconds.GetRef()); + } + if (QueryParams_.CustomQueueName) { + req->SetCustomQueueName(QueryParams_.CustomQueueName.GetRef()); + } + for (const auto& attr : QueryParams_.Attributes) { req->AddAttributes()->CopyFrom(attr.second); } diff --git a/ydb/core/ymq/http/params.h b/ydb/core/ymq/http/params.h index 7834fc4f0f..82aafc962c 100644 --- a/ydb/core/ymq/http/params.h +++ b/ydb/core/ymq/http/params.h @@ -30,6 +30,9 @@ struct TParameters { TMaybe<TString> Version; TMaybe<ui64> VisibilityTimeout; TMaybe<ui64> WaitTimeSeconds; + TMaybe<ui64> CreateTimestampSeconds; + TMaybe<TString> CustomQueueName; + TMap<int, TString> AttributeNames; TMap<int, TAttribute> Attributes; diff --git a/ydb/core/ymq/http/parser.rl6 b/ydb/core/ymq/http/parser.rl6 index b617c334a3..3d50ec6a65 100644 --- a/ydb/core/ymq/http/parser.rl6 +++ b/ydb/core/ymq/http/parser.rl6 @@ -136,6 +136,8 @@ main := |* ('Version') { CurrentParams_->Version = value; }; ('VisibilityTimeout') { CurrentParams_->VisibilityTimeout = ParseAndValidate(value, TStringBuf("VisibilityTimeout")); }; ('WaitTimeSeconds') { CurrentParams_->WaitTimeSeconds = ParseAndValidate(value, TStringBuf("WaitTimeSeconds")); }; + ('CreatedTimestamp') { CurrentParams_->CreateTimestampSeconds = ParseAndValidate(value, TStringBuf("CreatedTimestamp")); }; + ('CustomQueueName') { CurrentParams_->CustomQueueName = ParseAndValidate(value, TStringBuf("CustomQueueName")); }; attribute; attribute_name; diff --git a/ydb/tests/library/sqs/requests_client.py b/ydb/tests/library/sqs/requests_client.py index 6b7545503f..8a9f1648bf 100644 --- a/ydb/tests/library/sqs/requests_client.py +++ b/ydb/tests/library/sqs/requests_client.py @@ -198,7 +198,7 @@ class SqsHttpApi(object): extract_result_method=lambda x: x['CountQueuesResponse']['CountQueuesResult']['Count'], ) - def create_queue(self, queue_name, is_fifo=False, attributes=None): + def create_queue(self, queue_name, is_fifo=False, attributes=None, private_api=False, created_timestamp_sec=None, custom_name=None): # if is_fifo and not queue_name.endswith('.fifo'): # return None if attributes is None: @@ -207,12 +207,18 @@ class SqsHttpApi(object): attributes = dict(attributes) # copy attributes['FifoQueue'] = 'true' params = {} + if created_timestamp_sec is not None: + params['CreatedTimestamp'] = created_timestamp_sec + if custom_name is not None: + params['CustomQueueName'] = custom_name + for i, (k, v) in enumerate(attributes.items()): params['Attribute.{id}.Name'.format(id=i+1)] = k params['Attribute.{id}.Value'.format(id=i + 1)] = v return self.execute_request( action='CreateQueue', + private=private_api, extract_result_method=lambda x: x['CreateQueueResponse']['CreateQueueResult']['QueueUrl'], QueueName=queue_name, **params |