diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-04-08 16:43:16 +0300 |
---|---|---|
committer | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-04-08 16:43:16 +0300 |
commit | 15c9338e60c485641784de34fc2084469e98395c (patch) | |
tree | 99cdded1e867c527999ca5eb415a932f5bb707b5 | |
parent | b30995f957aa40564245a566c3e1307c337a2844 (diff) | |
download | ydb-15c9338e60c485641784de34fc2084469e98395c.tar.gz |
[sqs] fix matching attrs and remove messages queries SQS-672
ref:55e30515e455f7e71be8a67664685ae446548ad9
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 3 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_schema.cpp | 185 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_schema.h | 2 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/db_queries_maker.cpp | 44 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/db_queries_maker.h | 38 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/queries.cpp | 114 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/queries.h | 2 | ||||
-rw-r--r-- | ydb/core/ymq/queues/std/queries.cpp | 17 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/test_multiplexing_tables_format.py | 15 |
9 files changed, 256 insertions, 164 deletions
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index c1a02b4bdf..250731b0df 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -2519,6 +2519,9 @@ void TQueueLeader::TDeleteBatch::Execute(TQueueLeader* leader) { .RetryOnTimeout() .OnExecuted([leader, shard = Shard, batchId = BatchId](const TSqsEvents::TEvExecuted::TRecord& ev) { leader->OnDeleteBatchExecuted(shard, batchId, ev); }) .Params() + .Uint64("QUEUE_ID_NUMBER", leader->QueueVersion_) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(leader->QueueVersion_)) + .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(leader->QueueVersion_, Shard)) .Uint64("NOW", TActivationContext::Now().MilliSeconds()) .AddWithType("SHARD", Shard, leader->TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) .Uint64("GROUPS_READ_ATTEMPT_IDS_PERIOD", Cfg().GetGroupsReadAttemptIdsPeriodMs()); diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp index e55b157bad..5de9ba2058 100644 --- a/ydb/core/ymq/actor/queue_schema.cpp +++ b/ydb/core/ymq/actor/queue_schema.cpp @@ -6,6 +6,7 @@ #include "serviceid.h" #include <ydb/core/ymq/base/limits.h> +#include <ydb/core/ymq/queues/common/db_queries_maker.h> #include <ydb/core/ymq/queues/common/key_hashes.h> #include <ydb/core/ymq/queues/fifo/schema.h> #include <ydb/core/ymq/queues/std/schema.h> @@ -170,7 +171,7 @@ static const char* const ReadQueueParamsQueryCloud = R"__( '('Account userName userName) '('QueueName (Utf8String '"") (Void)))) (let queues - (Member (SelectRange queuesTable queuesRange '('QueueName 'CustomQueueName 'Version 'FolderId) '()) 'List)) + (Member (SelectRange queuesTable queuesRange '('QueueName 'CustomQueueName 'Version 'FolderId 'TablesFormat) '()) 'List)) (let overLimit (LessOrEqual maxQueuesCountSetting (Length queues))) @@ -189,6 +190,10 @@ static const char* const ReadQueueParamsQueryCloud = R"__( (Member (ToOptional existingQueuesWithSameNameAndFolderId) 'Version) (Uint64 '0) )) + (let currentTablesFormat (Coalesce + (Member (ToOptional existingQueuesWithSameNameAndFolderId) 'TablesFormat) + (Uint32 '0) + )) (let existingResourceId (Coalesce (Member (ToOptional existingQueuesWithSameNameAndFolderId) 'QueueName) (Utf8String '"") @@ -198,7 +203,8 @@ static const char* const ReadQueueParamsQueryCloud = R"__( (SetResult 'exists queueExists) (SetResult 'resourceId existingResourceId) (SetResult 'overLimit overLimit) - (SetResult 'version currentVersion))) + (SetResult 'version currentVersion) + (SetResult 'tablesFormat currentTablesFormat))) ) )__"; @@ -232,7 +238,8 @@ static const char* const ReadQueueParamsQueryYandex = R"__( '('QueueName name))) (let queuesSelect '( 'QueueState - 'Version)) + 'Version + 'TablesFormat)) (let queuesRead (SelectRow queuesTable queuesRow queuesSelect)) (let queueExists @@ -249,11 +256,18 @@ static const char* const ReadQueueParamsQueryYandex = R"__( (Uint64 '0) ) ) + (let currentTablesFormat + (Coalesce + (Member queuesRead 'TablesFormat) + (Uint32 '0) + ) + ) (return (AsList (SetResult 'exists queueExists) (SetResult 'overLimit overLimit) - (SetResult 'version currentVersion))) + (SetResult 'version currentVersion) + (SetResult 'tablesFormat currentTablesFormat))) ) )__"; @@ -321,7 +335,8 @@ void TCreateQueueSchemaActorV2::OnReadQueueParams(TSqsEvents::TEvExecuted::TPtr& ExistingQueueResourceId_ = TString(val["resourceId"]); } const ui64 currentVersion = ui64(val["version"]); - MatchQueueAttributes(currentVersion); + const ui32 currentTablesFormat = ui32(val["tablesFormat"]); + MatchQueueAttributes(currentVersion, currentTablesFormat); return; } else { if (bool(val["overLimit"])) { @@ -756,7 +771,7 @@ static const char* const CommitQueueParamsQuery = R"__( '('Account userName userName) '('QueueName (Utf8String '"") (Void)))) (let queues - (Member (SelectRange queuesTable queuesRange '('QueueName 'CustomQueueName 'Version 'FolderId 'QueueState) '()) 'List)) + (Member (SelectRange queuesTable queuesRange '('QueueName 'CustomQueueName 'Version 'FolderId 'QueueState 'TablesFormat) '()) 'List)) (let overLimit (LessOrEqual maxQueuesCountSetting (Length queues))) @@ -775,7 +790,8 @@ static const char* const CommitQueueParamsQuery = R"__( 'FifoQueue 'Shards 'Partitions - 'Version)) + 'Version + 'TablesFormat)) (let queuesRead (SelectRow queuesTable queuesRow queuesSelect)) (let existingQueuesWithSameNameAndFolderId @@ -815,6 +831,14 @@ static const char* const CommitQueueParamsQuery = R"__( ) ) + (let currentTablesFormat + (Coalesce + (Member (ToOptional existingQueuesWithSameNameAndFolderId) 'TablesFormat) + (Member queuesRead 'TablesFormat) + tablesFormat + ) + ) + (let queuesUpdate '( '('QueueId id) '('CustomQueueName customName) @@ -872,6 +896,7 @@ static const char* const CommitQueueParamsQuery = R"__( (SetResult 'exists queueExists) (SetResult 'overLimit overLimit) (SetResult 'version currentVersion) + (SetResult 'tablesFormat currentTablesFormat) (SetResult 'resourceId existingResourceId) (SetResult 'commited willCommit)) @@ -1019,7 +1044,8 @@ void TCreateQueueSchemaActorV2::OnCommit(TSqsEvents::TEvExecuted::TPtr& ev) { ExistingQueueResourceId_ = TString(val["resourceId"]); } const ui64 currentVersion = ui64(val["version"]); - MatchQueueAttributes(currentVersion); + const ui32 currentTablesFormat = ui32(val["tablesFormat"]); + MatchQueueAttributes(currentVersion, currentTablesFormat); return; } else { Y_VERIFY(false); // unreachable @@ -1033,127 +1059,34 @@ void TCreateQueueSchemaActorV2::OnCommit(TSqsEvents::TEvExecuted::TPtr& ev) { PassAway(); } -static const char* const MatchQueueAttributesQuery = R"__( - ( - (let name (Parameter 'NAME (DataType 'Utf8String))) - (let fifo (Parameter 'FIFO (DataType 'Bool))) - (let shards (Parameter 'SHARDS (DataType 'Uint64))) - (let partitions (Parameter 'PARTITIONS (DataType 'Uint64))) - (let expectedVersion (Parameter 'EXPECTED_VERSION (DataType 'Uint64))) - (let maxSize (Parameter 'MAX_SIZE (DataType 'Uint64))) - (let delay (Parameter 'DELAY (DataType 'Uint64))) - (let visibility (Parameter 'VISIBILITY (DataType 'Uint64))) - (let retention (Parameter 'RETENTION (DataType 'Uint64))) - (let dlqName (Parameter 'DLQ_TARGET_NAME (DataType 'Utf8String))) - (let maxReceiveCount (Parameter 'MAX_RECEIVE_COUNT (DataType 'Uint64))) - (let userName (Parameter 'USER_NAME (DataType 'Utf8String))) - - (let attrsTable '%1$s/%2$s/Attributes) - (let queuesTable '%3$s/.Queues) - - (let queuesRange '( - '('Account userName userName) - '('QueueName (Utf8String '"") (Void)))) - (let queues - (Member (SelectRange queuesTable queuesRange '('QueueState) '()) 'List)) - - (let queuesRow '( - '('Account userName) - '('QueueName name))) - (let queuesSelect '( - 'QueueState - 'QueueId - 'FifoQueue - 'Shards - 'Partitions - 'DlqName - 'Version)) - (let queuesRead (SelectRow queuesTable queuesRow queuesSelect)) - - (let queueExists - (Coalesce - (Or - (Equal (Uint64 '1) (Member queuesRead 'QueueState)) - (Equal (Uint64 '3) (Member queuesRead 'QueueState)) - ) - (Bool 'false))) - - (let currentVersion - (Coalesce - (Member queuesRead 'Version) - (Uint64 '0) - ) - ) - - (let sameParams - (Coalesce - (And - (And - (And (Equal (Member queuesRead 'Shards) shards) - (Equal (Member queuesRead 'Partitions) partitions)) - (Equal (Member queuesRead 'FifoQueue) fifo)) - (Equal (Coalesce (Member queuesRead 'DlqName) (Utf8String '"")) dlqName)) - (Bool 'true))) - - (let attrRow '( - '('State (Uint64 '0)))) - (let attrSelect '( - 'DelaySeconds - 'MaximumMessageSize - 'MessageRetentionPeriod - 'MaxReceiveCount - 'VisibilityTimeout)) - (let attrRead (SelectRow attrsTable attrRow attrSelect)) - - (let sameAttributes - (Coalesce - (And - (And - (And (Equal (Member attrRead 'DelaySeconds) delay) - (And (Equal (Member attrRead 'MaximumMessageSize) maxSize) - (Equal (Member attrRead 'MessageRetentionPeriod) retention))) - (Equal (Member attrRead 'VisibilityTimeout) visibility)) - (Equal (Coalesce (Member attrRead 'MaxReceiveCount) (Uint64 '0)) maxReceiveCount)) - (Bool 'true))) - - (let sameVersion - (Equal currentVersion expectedVersion)) - - (let isSame - (And - queueExists - (And - sameVersion - (And - sameAttributes - sameParams)))) - - (let existingQueueId - (Coalesce - (Member queuesRead 'QueueId) - (String '""))) - - (return (AsList - (SetResult 'exists queueExists) - (SetResult 'sameVersion sameVersion) - (SetResult 'id existingQueueId) - (SetResult 'isSame isSame))) - ) -)__"; - -void TCreateQueueSchemaActorV2::MatchQueueAttributes(const ui64 currentVersion) { +void TCreateQueueSchemaActorV2::MatchQueueAttributes( + const ui64 currentVersion, + const ui32 currentTablesFormat +) { Become(&TCreateQueueSchemaActorV2::MatchAttributes); - TString versionedQueuePath = IsCloudMode_ ? ExistingQueueResourceId_ : QueuePath_.QueueName; - if (currentVersion != 0) { - // modern-way constructed queue requires version suffix - versionedQueuePath = TString::Join(versionedQueuePath, "/v", ToString(currentVersion)); - } - auto ev = MakeExecuteEvent(Sprintf( - MatchQueueAttributesQuery, QueuePath_.GetUserPath().c_str(), versionedQueuePath.c_str(), Cfg().GetRoot().c_str() - )); + Y_VERIFY(currentVersion != 0); + + TDbQueriesMaker queryMaker( + Cfg().GetRoot(), + QueuePath_.UserName, + QueuePath_.QueueName, + currentVersion, + IsFifo_, + 0, + currentTablesFormat, + "", // dlqName + 0, // dlqShard + 0, // dlqVersion + 0 // dlqTablesFormat + ); + auto query = queryMaker.GetMatchQueueAttributesQuery(); + + auto ev = MakeExecuteEvent(query); auto* trans = ev->Record.MutableTransaction()->MutableMiniKQLTransaction(); TParameters(trans->MutableParams()->MutableProto()) + .Uint64("QUEUE_ID_NUMBER", currentVersion) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(currentVersion)) .Utf8("NAME", IsCloudMode_ ? ExistingQueueResourceId_ : QueuePath_.QueueName) .Bool("FIFO", IsFifo_) .Uint64("SHARDS", RequiredShardsCount_) @@ -1203,7 +1136,7 @@ void TCreateQueueSchemaActorV2::OnAttributesMatch(TSqsEvents::TEvExecuted::TPtr& resp->ErrorClass = &NErrors::VALIDATION_ERROR; } - if (CurrentCreationStep_ == ECreateComponentsStep::DiscoverLeaderTabletId) { + if (TablesFormat_ == 0 && CurrentCreationStep_ == ECreateComponentsStep::DiscoverLeaderTabletId) { // call the special version of cleanup actor RLOG_SQS_WARN("Removing redundant queue version: " << Version_ << " for queue " << QueuePath_.GetQueuePath() << ". Shards: " << RequiredShardsCount_ << " IsFifo: " << IsFifo_); diff --git a/ydb/core/ymq/actor/queue_schema.h b/ydb/core/ymq/actor/queue_schema.h index 793ba1c06a..6ff2d5aa06 100644 --- a/ydb/core/ymq/actor/queue_schema.h +++ b/ydb/core/ymq/actor/queue_schema.h @@ -74,7 +74,7 @@ public: void OnCommit(TSqsEvents::TEvExecuted::TPtr& ev); - void MatchQueueAttributes(const ui64 currentVersion); + void MatchQueueAttributes(const ui64 currentVersion, const ui32 currentTablesFormat); STATEFN(MatchAttributes); diff --git a/ydb/core/ymq/queues/common/db_queries_maker.cpp b/ydb/core/ymq/queues/common/db_queries_maker.cpp index 6d3aca8f13..47c16653ff 100644 --- a/ydb/core/ymq/queues/common/db_queries_maker.cpp +++ b/ydb/core/ymq/queues/common/db_queries_maker.cpp @@ -1 +1,45 @@ #include "db_queries_maker.h"
+
+#include "queries.h"
+
+
+namespace NKikimr::NSQS {
+
+ TString TDbQueriesMaker::FillQuery(const char* query) const {
+ return Sprintf(
+ query,
+ Root_.c_str(), // 1
+ QueueTablesFolderPerShard_.c_str(), // 2
+ QueueTablesFolder_.c_str(), // 3
+
+ QueueName_.c_str(), // 4
+ GetIdKeys(), // 5
+ GetIdKeysRange(), // 6
+ GetIdAndShardKeys(), // 7
+ GetIdAndShardKeysRange(), // 8
+ GetShardColumnType(TablesFormat_), // 9
+ GetShardColumnName(), // 10
+ GetStateKeys(), // 11
+ GetAttrKeys(), // 12
+ GetAllShardsRange(), // 13
+
+ DlqTablesFolder_.c_str(), // 14
+ DlqTablesFolderPerShard_.c_str(), // 15
+
+ GetDlqIdKeys(), // 16
+ GetDlqIdAndShardKeys(), // 17
+ GetShardColumnType(DlqTablesFormat_), // 18
+ GetDlqStateKeys() // 19
+ );
+ }
+
+ TString TDbQueriesMaker::operator() (EQueryId id) const {
+ return FillQuery(GetQueryById(id));
+ }
+
+ TString TDbQueriesMaker::GetMatchQueueAttributesQuery() const {
+ return FillQuery(MatchQueueAttributesQuery);
+ }
+
+
+} // namespace NKikimr::NSQS
\ No newline at end of file diff --git a/ydb/core/ymq/queues/common/db_queries_maker.h b/ydb/core/ymq/queues/common/db_queries_maker.h index 5ed4fde06f..e2da48d51d 100644 --- a/ydb/core/ymq/queues/common/db_queries_maker.h +++ b/ydb/core/ymq/queues/common/db_queries_maker.h @@ -77,35 +77,10 @@ public: );
}
- TString operator() (EQueryId id) {
- TString result = Sprintf(
- GetQueryById(id),
- Root_.c_str(), // 1
- QueueTablesFolderPerShard_.c_str(), // 2
- QueueTablesFolder_.c_str(), // 3
-
- QueueName_.c_str(), // 4
- GetIdKeys(), // 5
- GetIdKeysRange(), // 6
- GetIdAndShardKeys(), // 7
- GetIdAndShardKeysRange(), // 8
- GetShardColumnType(TablesFormat_), // 9
- GetShardColumnName(), // 10
- GetStateKeys(), // 11
- GetAttrKeys(), // 12
- GetAllShardsRange(), // 13
-
- DlqTablesFolder_.c_str(), // 14
- DlqTablesFolderPerShard_.c_str(), // 15
-
- GetDlqIdKeys(), // 16
- GetDlqIdAndShardKeys(), // 17
- GetShardColumnType(DlqTablesFormat_), // 18
- GetDlqStateKeys() // 19
- );
- return result;
- }
+ TString operator() (EQueryId id) const;
+ TString GetMatchQueueAttributesQuery() const;
+private:
const char* GetStateKeys() const {
if (TablesFormat_ == 0) {
return IsFifo_ ? "'('State (Uint64 '0))" : "'('State shard)";
@@ -162,7 +137,7 @@ public: ui64 shard,
TString& tablesFolder,
TString& tablesFolderPerShard
- ) {
+ ) const {
if (tablesFormat == 1) {
tablesFolder = tablesFolderPerShard = TStringBuilder() << Root_ << "/" << (IsFifo_ ? ".FIFO" : ".STD");
} else {
@@ -172,13 +147,14 @@ public: }
}
-private:
- const char* GetQueryById(EQueryId id) {
+ const char* GetQueryById(EQueryId id) const {
const char* query = IsFifo_ ? GetFifoQueryById(id) : GetStdQueryById(id);
Y_VERIFY(query);
return query;
}
+ TString FillQuery(const char* query) const;
+
private:
TString Root_;
TString QueueTablesFolder_;
diff --git a/ydb/core/ymq/queues/common/queries.cpp b/ydb/core/ymq/queues/common/queries.cpp index a646821b22..6badd26b62 100644 --- a/ydb/core/ymq/queues/common/queries.cpp +++ b/ydb/core/ymq/queues/common/queries.cpp @@ -1,5 +1,7 @@ #include "queries.h" +#include "db_queries_defs.h" + namespace NKikimr::NSQS { extern const char* const GetQueueParamsQuery = R"__( @@ -32,4 +34,116 @@ extern const char* const GetQueueParamsQuery = R"__( ) )__"; +extern const char* const MatchQueueAttributesQuery = R"__( + ( + (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) + (let name (Parameter 'NAME (DataType 'Utf8String))) + (let fifo (Parameter 'FIFO (DataType 'Bool))) + (let shards (Parameter 'SHARDS (DataType 'Uint64))) + (let partitions (Parameter 'PARTITIONS (DataType 'Uint64))) + (let expectedVersion (Parameter 'EXPECTED_VERSION (DataType 'Uint64))) + (let maxSize (Parameter 'MAX_SIZE (DataType 'Uint64))) + (let delay (Parameter 'DELAY (DataType 'Uint64))) + (let visibility (Parameter 'VISIBILITY (DataType 'Uint64))) + (let retention (Parameter 'RETENTION (DataType 'Uint64))) + (let dlqName (Parameter 'DLQ_TARGET_NAME (DataType 'Utf8String))) + (let maxReceiveCount (Parameter 'MAX_RECEIVE_COUNT (DataType 'Uint64))) + (let userName (Parameter 'USER_NAME (DataType 'Utf8String))) + + (let attrsTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/Attributes) + (let queuesTable ')__" ROOT_PARAM R"__(/.Queues) + + (let queuesRange '( + '('Account userName userName) + '('QueueName (Utf8String '"") (Void)))) + (let queues + (Member (SelectRange queuesTable queuesRange '('QueueState) '()) 'List)) + + (let queuesRow '( + '('Account userName) + '('QueueName name))) + (let queuesSelect '( + 'QueueState + 'QueueId + 'FifoQueue + 'Shards + 'Partitions + 'DlqName + 'Version)) + (let queuesRead (SelectRow queuesTable queuesRow queuesSelect)) + + (let queueExists + (Coalesce + (Or + (Equal (Uint64 '1) (Member queuesRead 'QueueState)) + (Equal (Uint64 '3) (Member queuesRead 'QueueState)) + ) + (Bool 'false))) + + (let currentVersion + (Coalesce + (Member queuesRead 'Version) + (Uint64 '0) + ) + ) + + (let sameParams + (Coalesce + (And + (And + (And (Equal (Member queuesRead 'Shards) shards) + (Equal (Member queuesRead 'Partitions) partitions)) + (Equal (Member queuesRead 'FifoQueue) fifo)) + (Equal (Coalesce (Member queuesRead 'DlqName) (Utf8String '"")) dlqName)) + (Bool 'true))) + + (let attrRow '( + )__" ATTRS_KEYS_PARAM R"__( + )) + (let attrSelect '( + 'DelaySeconds + 'MaximumMessageSize + 'MessageRetentionPeriod + 'MaxReceiveCount + 'VisibilityTimeout)) + (let attrRead (SelectRow attrsTable attrRow attrSelect)) + + (let sameAttributes + (Coalesce + (And + (And + (And (Equal (Member attrRead 'DelaySeconds) delay) + (And (Equal (Member attrRead 'MaximumMessageSize) maxSize) + (Equal (Member attrRead 'MessageRetentionPeriod) retention))) + (Equal (Member attrRead 'VisibilityTimeout) visibility)) + (Equal (Coalesce (Member attrRead 'MaxReceiveCount) (Uint64 '0)) maxReceiveCount)) + (Bool 'true))) + + (let sameVersion + (Equal currentVersion expectedVersion)) + + (let isSame + (And + queueExists + (And + sameVersion + (And + sameAttributes + sameParams)))) + + (let existingQueueId + (Coalesce + (Member queuesRead 'QueueId) + (String '""))) + + (return (AsList + (SetResult 'exists queueExists) + (SetResult 'sameVersion sameVersion) + (SetResult 'id existingQueueId) + (SetResult 'isSame isSame))) + ) +)__"; + + } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/queues/common/queries.h b/ydb/core/ymq/queues/common/queries.h index 4f455213fe..966e74fbc4 100644 --- a/ydb/core/ymq/queues/common/queries.h +++ b/ydb/core/ymq/queues/common/queries.h @@ -4,4 +4,6 @@ namespace NKikimr::NSQS { extern const char* const GetQueueParamsQuery; +extern const char* const MatchQueueAttributesQuery; + } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp index 0ac0360ca3..8b0cc55b39 100644 --- a/ydb/core/ymq/queues/std/queries.cpp +++ b/ydb/core/ymq/queues/std/queries.cpp @@ -370,7 +370,6 @@ const char* const DeleteMessageQuery = R"__( '('Offset (DataType 'Uint64)) '('LockTimestamp (DataType 'Uint64)))))) (let now (Parameter 'NOW (DataType 'Uint64))) - (let shard (Parameter 'SHARD (DataType 'Uint64))) (let dataTable ')__" QUEUE_TABLES_FOLDER_PER_SHARD_PARAM R"__(/MessageData) (let inflyTable ')__" QUEUE_TABLES_FOLDER_PER_SHARD_PARAM R"__(/Infly) @@ -380,7 +379,9 @@ const char* const DeleteMessageQuery = R"__( (let records (MapParameter keys (lambda '(item) (block '( (let row '( - '('Offset (Member item 'Offset)))) + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( + '('Offset (Member item 'Offset)) + )) (let fields '( 'Offset 'RandomId @@ -424,23 +425,29 @@ const char* const DeleteMessageQuery = R"__( (If deleteCond (Map existed (lambda '(item) (block '( (let row '( - '('Offset (Member item 'Offset)))) + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( + '('Offset (Member item 'Offset)) + )) (return (EraseRow inflyTable row)))))) (AsList (Void))) (If deleteCond (Map existed (lambda '(item) (block '( (let row '( + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('RandomId (Member item 'RandomId)) - '('Offset (Member item 'Offset)))) + '('Offset (Member item 'Offset)) + )) (return (EraseRow dataTable row)))))) (AsList (Void))) (If deleteCond (Map existed (lambda '(item) (block '( (let row '( + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('SentTimestamp (Member item 'SentTimestamp)) - '('Offset (Member item 'Offset)))) + '('Offset (Member item 'Offset)) + )) (return (EraseRow sentTsIdx row)))))) (AsList (Void))) )) diff --git a/ydb/tests/functional/sqs/test_multiplexing_tables_format.py b/ydb/tests/functional/sqs/test_multiplexing_tables_format.py index 3b28a15282..bfc8734958 100644 --- a/ydb/tests/functional/sqs/test_multiplexing_tables_format.py +++ b/ydb/tests/functional/sqs/test_multiplexing_tables_format.py @@ -19,7 +19,7 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase): ) def create_queue(self, is_fifo): - if is_fifo: + if is_fifo and not self.queue_name.endswith('.fifo'): self.queue_name = self.queue_name + '.fifo' return self._create_queue_and_assert(self.queue_name, is_fifo=is_fifo) @@ -78,6 +78,19 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase): matcher=ReadResponseMatcher().with_message_ids([message_id, ]) ) + def do_test_double_create(self, is_fifo, tables_format): + self._set_new_format_settings(tables_format=tables_format) + self.create_queue(is_fifo) + self.create_queue(is_fifo) + + @pytest.mark.parametrize(**IS_FIFO_PARAMS) + def test_double_create(self, is_fifo): + self.do_test_double_create(is_fifo, tables_format='1') + + @pytest.mark.parametrize(**IS_FIFO_PARAMS) + def test_double_create_old(self, is_fifo): + self.do_test_double_create(is_fifo, tables_format='0') + class TestMultiplexingTablesFormatWithTenant(get_test_with_sqs_tenant_installation(MultiplexingTablesFormatTest)): pass |