aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2022-10-21 17:52:53 +0300
committeralexbogo <alexbogo@ydb.tech>2022-10-21 17:52:53 +0300
commit54c2e75b23f012b13d7e89a41a6042c60e105681 (patch)
tree0cad063bc11d09dc2ce56e8deb6d4322de9bb64c
parentebcdbbcef81f8477ceb78a552f8b171b987f3e40 (diff)
downloadydb-54c2e75b23f012b13d7e89a41a6042c60e105681.tar.gz
[ymq] add fields CreatedTimestamp and CustomQueueName for private CreateQueueRequest
add_file init
-rw-r--r--ydb/core/protos/sqs.proto5
-rw-r--r--ydb/core/ymq/actor/create_queue.cpp35
-rw-r--r--ydb/core/ymq/actor/queue_schema.cpp9
-rw-r--r--ydb/core/ymq/http/http.cpp21
-rw-r--r--ydb/core/ymq/http/params.h3
-rw-r--r--ydb/core/ymq/http/parser.rl62
-rw-r--r--ydb/tests/library/sqs/requests_client.py8
7 files changed, 73 insertions, 10 deletions
diff --git a/ydb/core/protos/sqs.proto b/ydb/core/protos/sqs.proto
index eae557a624..45dee72b1a 100644
--- a/ydb/core/protos/sqs.proto
+++ b/ydb/core/protos/sqs.proto
@@ -92,6 +92,11 @@ message TCreateQueueRequest {
optional uint64 SizeToSplit = 9 [default = 1073741824]; // 1GB
/// Enable internal tables' transactions out of order execution.
optional bool EnableOutOfOrderTransactionsExecution = 6 [default = true];
+
+ /// Custom name of queue.
+ optional string CustomQueueName = 10;
+ /// Created time of queue
+ optional uint64 CreatedTimestamp = 11;
}
message TCreateQueueResponse {
diff --git a/ydb/core/ymq/actor/create_queue.cpp b/ydb/core/ymq/actor/create_queue.cpp
index 2d4b3af214..c5311ffd34 100644
--- a/ydb/core/ymq/actor/create_queue.cpp
+++ b/ydb/core/ymq/actor/create_queue.cpp
@@ -34,7 +34,8 @@ public:
}
protected:
bool IsFifoQueue() const override {
- return AsciiHasSuffixIgnoreCase(Request().GetQueueName(), ".fifo"); // works for cloud too, since the custom name should end with '.fifo'
+ const TString& name = Request().GetCustomQueueName() ? Request().GetCustomQueueName() : Request().GetQueueName();
+ return AsciiHasSuffixIgnoreCase(name, ".fifo"); // works for cloud too, since the custom name should end with '.fifo'
}
private:
@@ -76,6 +77,25 @@ private:
return false;
}
+ if (Request().HasCustomQueueName()) {
+ if (IsCloud()) {
+ if (!ValidateQueueNameOrUserName(Request().GetCustomQueueName())) {
+ MakeError(result, NErrors::INVALID_PARAMETER_VALUE, "Invalid custom queue name.");
+ return false;
+ }
+ } else {
+ if (!Request().GetCustomQueueName().empty()) {
+ MakeError(result, NErrors::INVALID_PARAMETER_VALUE, "Custom queue name must be empty or unset.");
+ return false;
+ }
+ }
+ }
+
+ if (Request().HasCreatedTimestamp() && Request().GetCreatedTimestamp() > TActivationContext::AsActorContext().Now().Seconds()) {
+ MakeError(result, NErrors::INVALID_PARAMETER_VALUE, "Invalid created timestamp.");
+ return false;
+ }
+
if (Request().GetShards() > MAX_SHARDS_COUNT) {
MakeError(result, NErrors::INVALID_PARAMETER_VALUE, "Too many shards.");
return false;
@@ -111,11 +131,14 @@ private:
Become(&TThis::StateFunc);
if (IsCloud()) {
- Register(new TAtomicCounterActor(SelfId(), Cfg().GetRoot(), RequestId_));
+ if (Request().GetCustomQueueName()) {
+ ResourceId_ = Request().GetQueueName();
+ StartQueueCreation(Request().GetQueueName(), UserName_, Request().GetCustomQueueName());
+ } else {
+ Register(new TAtomicCounterActor(SelfId(), Cfg().GetRoot(), RequestId_));
+ }
} else {
- static const TString emptyCustomQueueName = "";
-
- StartQueueCreation(Request().GetQueueName(), UserName_, emptyCustomQueueName);
+ StartQueueCreation(Request().GetQueueName(), UserName_, Request().GetCustomQueueName());
}
}
@@ -167,7 +190,7 @@ private:
case EQueueState::Active:
if (event->Success) {
- const TString& name = Request().GetQueueName();
+ const TString& name = Request().GetCustomQueueName() ? Request().GetCustomQueueName() : Request().GetQueueName();
if (IsCloud()) {
const auto finalResourceId = event->AlreadyExists ? event->ExistingQueueResourceId : ResourceId_;
result->SetQueueName(finalResourceId);
diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp
index 92621a4f27..b25b7bde05 100644
--- a/ydb/core/ymq/actor/queue_schema.cpp
+++ b/ydb/core/ymq/actor/queue_schema.cpp
@@ -713,6 +713,7 @@ static const char* const CommitQueueParamsQuery = R"__(
(let fifo (Parameter 'FIFO (DataType 'Bool)))
(let contentBasedDeduplication (Parameter 'CONTENT_BASED_DEDUPLICATION (DataType 'Bool)))
(let now (Parameter 'NOW (DataType 'Uint64)))
+ (let createdTimestamp (Parameter 'CREATED_TIMESTAMP (DataType 'Uint64)))
(let shards (Parameter 'SHARDS (DataType 'Uint64)))
(let partitions (Parameter 'PARTITIONS (DataType 'Uint64)))
(let masterTabletId (Parameter 'MASTER_TABLET_ID (DataType 'Uint64)))
@@ -823,7 +824,7 @@ static const char* const CommitQueueParamsQuery = R"__(
'('QueueState (Uint64 '3))
'('FifoQueue fifo)
'('DeadLetterQueue (Bool 'false))
- '('CreatedTimestamp now)
+ '('CreatedTimestamp createdTimestamp)
'('Shards shards)
'('Partitions partitions)
'('Version queueIdNumber)
@@ -857,7 +858,7 @@ static const char* const CommitQueueParamsQuery = R"__(
(let stateUpdate '(
'('CleanupTimestamp now)
- '('CreatedTimestamp now)
+ '('CreatedTimestamp createdTimestamp)
'('LastModifiedTimestamp now)
'('InflyCount (Int64 '0))
'('MessageCount (Int64 '0))
@@ -891,7 +892,7 @@ static const char* const CommitQueueParamsQuery = R"__(
(let row '(%5$s))
(let update '(
'('CleanupTimestamp now)
- '('CreatedTimestamp now)
+ '('CreatedTimestamp createdTimestamp)
'('LastModifiedTimestamp now)
'('InflyCount (Int64 '0))
'('MessageCount (Int64 '0))
@@ -962,6 +963,7 @@ void TCreateQueueSchemaActorV2::CommitNewVersion() {
auto ev = MakeExecuteEvent(query);
auto* trans = ev->Record.MutableTransaction()->MutableMiniKQLTransaction();
Y_VERIFY(TablesFormat_ == 1 || LeaderTabletId_ != 0);
+ TInstant createdTimestamp = Request_.HasCreatedTimestamp() ? TInstant::Seconds(Request_.GetCreatedTimestamp()) : QueueCreationTimestamp_;
TParameters(trans->MutableParams()->MutableProto())
.Utf8("NAME", QueuePath_.QueueName)
.Utf8("CUSTOMNAME", CustomQueueName_)
@@ -970,6 +972,7 @@ void TCreateQueueSchemaActorV2::CommitNewVersion() {
.Bool("FIFO", IsFifo_)
.Bool("CONTENT_BASED_DEDUPLICATION", *ValidatedAttributes_.ContentBasedDeduplication)
.Uint64("NOW", QueueCreationTimestamp_.MilliSeconds())
+ .Uint64("CREATED_TIMESTAMP", createdTimestamp.MilliSeconds())
.Uint64("SHARDS", RequiredShardsCount_)
.Uint64("PARTITIONS", Request_.GetPartitions())
.Uint64("MASTER_TABLET_ID", LeaderTabletId_)
diff --git a/ydb/core/ymq/http/http.cpp b/ydb/core/ymq/http/http.cpp
index 206ec6de56..d9c220f9aa 100644
--- a/ydb/core/ymq/http/http.cpp
+++ b/ydb/core/ymq/http/http.cpp
@@ -486,10 +486,24 @@ void THttpRequest::ParseRequest(THttpInput& input) {
break; \
}
+bool HasPrivateActionParams(EAction action, const TParameters& params) {
+ if (action == EAction::CreateQueue) {
+ return params.CreateTimestampSeconds || params.CustomQueueName;
+ }
+ return false;
+}
+
bool THttpRequest::SetupRequest() {
auto requestHolder = MakeHolder<TSqsRequest>();
requestHolder->SetRequestId(RequestId_);
+ if (HasPrivateActionParams(Action_, QueryParams_) && !IsPrivateRequest_) {
+ RLOG_SQS_BASE_ERROR(
+ *Parent_->ActorSystem_,
+ "Attempt to call private " << Action_ << " action format without private url path"
+ );
+ throw TSQSException(NErrors::INVALID_ACTION);
+ }
// Validate batches
if (IsBatchAction(Action_)) {
if (QueryParams_.BatchEntries.empty()) {
@@ -608,6 +622,13 @@ void THttpRequest::SetupCreateQueue(TCreateQueueRequest* const req) {
req->SetQueueName(QueueName_);
req->MutableAuth()->SetUserName(UserName_);
+ if (QueryParams_.CreateTimestampSeconds) {
+ req->SetCreatedTimestamp(QueryParams_.CreateTimestampSeconds.GetRef());
+ }
+ if (QueryParams_.CustomQueueName) {
+ req->SetCustomQueueName(QueryParams_.CustomQueueName.GetRef());
+ }
+
for (const auto& attr : QueryParams_.Attributes) {
req->AddAttributes()->CopyFrom(attr.second);
}
diff --git a/ydb/core/ymq/http/params.h b/ydb/core/ymq/http/params.h
index 7834fc4f0f..82aafc962c 100644
--- a/ydb/core/ymq/http/params.h
+++ b/ydb/core/ymq/http/params.h
@@ -30,6 +30,9 @@ struct TParameters {
TMaybe<TString> Version;
TMaybe<ui64> VisibilityTimeout;
TMaybe<ui64> WaitTimeSeconds;
+ TMaybe<ui64> CreateTimestampSeconds;
+ TMaybe<TString> CustomQueueName;
+
TMap<int, TString> AttributeNames;
TMap<int, TAttribute> Attributes;
diff --git a/ydb/core/ymq/http/parser.rl6 b/ydb/core/ymq/http/parser.rl6
index b617c334a3..3d50ec6a65 100644
--- a/ydb/core/ymq/http/parser.rl6
+++ b/ydb/core/ymq/http/parser.rl6
@@ -136,6 +136,8 @@ main := |*
('Version') { CurrentParams_->Version = value; };
('VisibilityTimeout') { CurrentParams_->VisibilityTimeout = ParseAndValidate(value, TStringBuf("VisibilityTimeout")); };
('WaitTimeSeconds') { CurrentParams_->WaitTimeSeconds = ParseAndValidate(value, TStringBuf("WaitTimeSeconds")); };
+ ('CreatedTimestamp') { CurrentParams_->CreateTimestampSeconds = ParseAndValidate(value, TStringBuf("CreatedTimestamp")); };
+ ('CustomQueueName') { CurrentParams_->CustomQueueName = ParseAndValidate(value, TStringBuf("CustomQueueName")); };
attribute;
attribute_name;
diff --git a/ydb/tests/library/sqs/requests_client.py b/ydb/tests/library/sqs/requests_client.py
index 6b7545503f..8a9f1648bf 100644
--- a/ydb/tests/library/sqs/requests_client.py
+++ b/ydb/tests/library/sqs/requests_client.py
@@ -198,7 +198,7 @@ class SqsHttpApi(object):
extract_result_method=lambda x: x['CountQueuesResponse']['CountQueuesResult']['Count'],
)
- def create_queue(self, queue_name, is_fifo=False, attributes=None):
+ def create_queue(self, queue_name, is_fifo=False, attributes=None, private_api=False, created_timestamp_sec=None, custom_name=None):
# if is_fifo and not queue_name.endswith('.fifo'):
# return None
if attributes is None:
@@ -207,12 +207,18 @@ class SqsHttpApi(object):
attributes = dict(attributes) # copy
attributes['FifoQueue'] = 'true'
params = {}
+ if created_timestamp_sec is not None:
+ params['CreatedTimestamp'] = created_timestamp_sec
+ if custom_name is not None:
+ params['CustomQueueName'] = custom_name
+
for i, (k, v) in enumerate(attributes.items()):
params['Attribute.{id}.Name'.format(id=i+1)] = k
params['Attribute.{id}.Value'.format(id=i + 1)] = v
return self.execute_request(
action='CreateQueue',
+ private=private_api,
extract_result_method=lambda x: x['CreateQueueResponse']['CreateQueueResult']['QueueUrl'],
QueueName=queue_name,
**params