aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-04-08 16:43:16 +0300
committerAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-04-08 16:43:16 +0300
commit15c9338e60c485641784de34fc2084469e98395c (patch)
tree99cdded1e867c527999ca5eb415a932f5bb707b5
parentb30995f957aa40564245a566c3e1307c337a2844 (diff)
downloadydb-15c9338e60c485641784de34fc2084469e98395c.tar.gz
[sqs] fix matching attrs and remove messages queries SQS-672
ref:55e30515e455f7e71be8a67664685ae446548ad9
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp3
-rw-r--r--ydb/core/ymq/actor/queue_schema.cpp185
-rw-r--r--ydb/core/ymq/actor/queue_schema.h2
-rw-r--r--ydb/core/ymq/queues/common/db_queries_maker.cpp44
-rw-r--r--ydb/core/ymq/queues/common/db_queries_maker.h38
-rw-r--r--ydb/core/ymq/queues/common/queries.cpp114
-rw-r--r--ydb/core/ymq/queues/common/queries.h2
-rw-r--r--ydb/core/ymq/queues/std/queries.cpp17
-rw-r--r--ydb/tests/functional/sqs/test_multiplexing_tables_format.py15
9 files changed, 256 insertions, 164 deletions
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index c1a02b4bdf..250731b0df 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -2519,6 +2519,9 @@ void TQueueLeader::TDeleteBatch::Execute(TQueueLeader* leader) {
.RetryOnTimeout()
.OnExecuted([leader, shard = Shard, batchId = BatchId](const TSqsEvents::TEvExecuted::TRecord& ev) { leader->OnDeleteBatchExecuted(shard, batchId, ev); })
.Params()
+ .Uint64("QUEUE_ID_NUMBER", leader->QueueVersion_)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(leader->QueueVersion_))
+ .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(leader->QueueVersion_, Shard))
.Uint64("NOW", TActivationContext::Now().MilliSeconds())
.AddWithType("SHARD", Shard, leader->TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
.Uint64("GROUPS_READ_ATTEMPT_IDS_PERIOD", Cfg().GetGroupsReadAttemptIdsPeriodMs());
diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp
index e55b157bad..5de9ba2058 100644
--- a/ydb/core/ymq/actor/queue_schema.cpp
+++ b/ydb/core/ymq/actor/queue_schema.cpp
@@ -6,6 +6,7 @@
#include "serviceid.h"
#include <ydb/core/ymq/base/limits.h>
+#include <ydb/core/ymq/queues/common/db_queries_maker.h>
#include <ydb/core/ymq/queues/common/key_hashes.h>
#include <ydb/core/ymq/queues/fifo/schema.h>
#include <ydb/core/ymq/queues/std/schema.h>
@@ -170,7 +171,7 @@ static const char* const ReadQueueParamsQueryCloud = R"__(
'('Account userName userName)
'('QueueName (Utf8String '"") (Void))))
(let queues
- (Member (SelectRange queuesTable queuesRange '('QueueName 'CustomQueueName 'Version 'FolderId) '()) 'List))
+ (Member (SelectRange queuesTable queuesRange '('QueueName 'CustomQueueName 'Version 'FolderId 'TablesFormat) '()) 'List))
(let overLimit
(LessOrEqual maxQueuesCountSetting (Length queues)))
@@ -189,6 +190,10 @@ static const char* const ReadQueueParamsQueryCloud = R"__(
(Member (ToOptional existingQueuesWithSameNameAndFolderId) 'Version)
(Uint64 '0)
))
+ (let currentTablesFormat (Coalesce
+ (Member (ToOptional existingQueuesWithSameNameAndFolderId) 'TablesFormat)
+ (Uint32 '0)
+ ))
(let existingResourceId (Coalesce
(Member (ToOptional existingQueuesWithSameNameAndFolderId) 'QueueName)
(Utf8String '"")
@@ -198,7 +203,8 @@ static const char* const ReadQueueParamsQueryCloud = R"__(
(SetResult 'exists queueExists)
(SetResult 'resourceId existingResourceId)
(SetResult 'overLimit overLimit)
- (SetResult 'version currentVersion)))
+ (SetResult 'version currentVersion)
+ (SetResult 'tablesFormat currentTablesFormat)))
)
)__";
@@ -232,7 +238,8 @@ static const char* const ReadQueueParamsQueryYandex = R"__(
'('QueueName name)))
(let queuesSelect '(
'QueueState
- 'Version))
+ 'Version
+ 'TablesFormat))
(let queuesRead (SelectRow queuesTable queuesRow queuesSelect))
(let queueExists
@@ -249,11 +256,18 @@ static const char* const ReadQueueParamsQueryYandex = R"__(
(Uint64 '0)
)
)
+ (let currentTablesFormat
+ (Coalesce
+ (Member queuesRead 'TablesFormat)
+ (Uint32 '0)
+ )
+ )
(return (AsList
(SetResult 'exists queueExists)
(SetResult 'overLimit overLimit)
- (SetResult 'version currentVersion)))
+ (SetResult 'version currentVersion)
+ (SetResult 'tablesFormat currentTablesFormat)))
)
)__";
@@ -321,7 +335,8 @@ void TCreateQueueSchemaActorV2::OnReadQueueParams(TSqsEvents::TEvExecuted::TPtr&
ExistingQueueResourceId_ = TString(val["resourceId"]);
}
const ui64 currentVersion = ui64(val["version"]);
- MatchQueueAttributes(currentVersion);
+ const ui32 currentTablesFormat = ui32(val["tablesFormat"]);
+ MatchQueueAttributes(currentVersion, currentTablesFormat);
return;
} else {
if (bool(val["overLimit"])) {
@@ -756,7 +771,7 @@ static const char* const CommitQueueParamsQuery = R"__(
'('Account userName userName)
'('QueueName (Utf8String '"") (Void))))
(let queues
- (Member (SelectRange queuesTable queuesRange '('QueueName 'CustomQueueName 'Version 'FolderId 'QueueState) '()) 'List))
+ (Member (SelectRange queuesTable queuesRange '('QueueName 'CustomQueueName 'Version 'FolderId 'QueueState 'TablesFormat) '()) 'List))
(let overLimit
(LessOrEqual maxQueuesCountSetting (Length queues)))
@@ -775,7 +790,8 @@ static const char* const CommitQueueParamsQuery = R"__(
'FifoQueue
'Shards
'Partitions
- 'Version))
+ 'Version
+ 'TablesFormat))
(let queuesRead (SelectRow queuesTable queuesRow queuesSelect))
(let existingQueuesWithSameNameAndFolderId
@@ -815,6 +831,14 @@ static const char* const CommitQueueParamsQuery = R"__(
)
)
+ (let currentTablesFormat
+ (Coalesce
+ (Member (ToOptional existingQueuesWithSameNameAndFolderId) 'TablesFormat)
+ (Member queuesRead 'TablesFormat)
+ tablesFormat
+ )
+ )
+
(let queuesUpdate '(
'('QueueId id)
'('CustomQueueName customName)
@@ -872,6 +896,7 @@ static const char* const CommitQueueParamsQuery = R"__(
(SetResult 'exists queueExists)
(SetResult 'overLimit overLimit)
(SetResult 'version currentVersion)
+ (SetResult 'tablesFormat currentTablesFormat)
(SetResult 'resourceId existingResourceId)
(SetResult 'commited willCommit))
@@ -1019,7 +1044,8 @@ void TCreateQueueSchemaActorV2::OnCommit(TSqsEvents::TEvExecuted::TPtr& ev) {
ExistingQueueResourceId_ = TString(val["resourceId"]);
}
const ui64 currentVersion = ui64(val["version"]);
- MatchQueueAttributes(currentVersion);
+ const ui32 currentTablesFormat = ui32(val["tablesFormat"]);
+ MatchQueueAttributes(currentVersion, currentTablesFormat);
return;
} else {
Y_VERIFY(false); // unreachable
@@ -1033,127 +1059,34 @@ void TCreateQueueSchemaActorV2::OnCommit(TSqsEvents::TEvExecuted::TPtr& ev) {
PassAway();
}
-static const char* const MatchQueueAttributesQuery = R"__(
- (
- (let name (Parameter 'NAME (DataType 'Utf8String)))
- (let fifo (Parameter 'FIFO (DataType 'Bool)))
- (let shards (Parameter 'SHARDS (DataType 'Uint64)))
- (let partitions (Parameter 'PARTITIONS (DataType 'Uint64)))
- (let expectedVersion (Parameter 'EXPECTED_VERSION (DataType 'Uint64)))
- (let maxSize (Parameter 'MAX_SIZE (DataType 'Uint64)))
- (let delay (Parameter 'DELAY (DataType 'Uint64)))
- (let visibility (Parameter 'VISIBILITY (DataType 'Uint64)))
- (let retention (Parameter 'RETENTION (DataType 'Uint64)))
- (let dlqName (Parameter 'DLQ_TARGET_NAME (DataType 'Utf8String)))
- (let maxReceiveCount (Parameter 'MAX_RECEIVE_COUNT (DataType 'Uint64)))
- (let userName (Parameter 'USER_NAME (DataType 'Utf8String)))
-
- (let attrsTable '%1$s/%2$s/Attributes)
- (let queuesTable '%3$s/.Queues)
-
- (let queuesRange '(
- '('Account userName userName)
- '('QueueName (Utf8String '"") (Void))))
- (let queues
- (Member (SelectRange queuesTable queuesRange '('QueueState) '()) 'List))
-
- (let queuesRow '(
- '('Account userName)
- '('QueueName name)))
- (let queuesSelect '(
- 'QueueState
- 'QueueId
- 'FifoQueue
- 'Shards
- 'Partitions
- 'DlqName
- 'Version))
- (let queuesRead (SelectRow queuesTable queuesRow queuesSelect))
-
- (let queueExists
- (Coalesce
- (Or
- (Equal (Uint64 '1) (Member queuesRead 'QueueState))
- (Equal (Uint64 '3) (Member queuesRead 'QueueState))
- )
- (Bool 'false)))
-
- (let currentVersion
- (Coalesce
- (Member queuesRead 'Version)
- (Uint64 '0)
- )
- )
-
- (let sameParams
- (Coalesce
- (And
- (And
- (And (Equal (Member queuesRead 'Shards) shards)
- (Equal (Member queuesRead 'Partitions) partitions))
- (Equal (Member queuesRead 'FifoQueue) fifo))
- (Equal (Coalesce (Member queuesRead 'DlqName) (Utf8String '"")) dlqName))
- (Bool 'true)))
-
- (let attrRow '(
- '('State (Uint64 '0))))
- (let attrSelect '(
- 'DelaySeconds
- 'MaximumMessageSize
- 'MessageRetentionPeriod
- 'MaxReceiveCount
- 'VisibilityTimeout))
- (let attrRead (SelectRow attrsTable attrRow attrSelect))
-
- (let sameAttributes
- (Coalesce
- (And
- (And
- (And (Equal (Member attrRead 'DelaySeconds) delay)
- (And (Equal (Member attrRead 'MaximumMessageSize) maxSize)
- (Equal (Member attrRead 'MessageRetentionPeriod) retention)))
- (Equal (Member attrRead 'VisibilityTimeout) visibility))
- (Equal (Coalesce (Member attrRead 'MaxReceiveCount) (Uint64 '0)) maxReceiveCount))
- (Bool 'true)))
-
- (let sameVersion
- (Equal currentVersion expectedVersion))
-
- (let isSame
- (And
- queueExists
- (And
- sameVersion
- (And
- sameAttributes
- sameParams))))
-
- (let existingQueueId
- (Coalesce
- (Member queuesRead 'QueueId)
- (String '"")))
-
- (return (AsList
- (SetResult 'exists queueExists)
- (SetResult 'sameVersion sameVersion)
- (SetResult 'id existingQueueId)
- (SetResult 'isSame isSame)))
- )
-)__";
-
-void TCreateQueueSchemaActorV2::MatchQueueAttributes(const ui64 currentVersion) {
+void TCreateQueueSchemaActorV2::MatchQueueAttributes(
+ const ui64 currentVersion,
+ const ui32 currentTablesFormat
+) {
Become(&TCreateQueueSchemaActorV2::MatchAttributes);
- TString versionedQueuePath = IsCloudMode_ ? ExistingQueueResourceId_ : QueuePath_.QueueName;
- if (currentVersion != 0) {
- // modern-way constructed queue requires version suffix
- versionedQueuePath = TString::Join(versionedQueuePath, "/v", ToString(currentVersion));
- }
- auto ev = MakeExecuteEvent(Sprintf(
- MatchQueueAttributesQuery, QueuePath_.GetUserPath().c_str(), versionedQueuePath.c_str(), Cfg().GetRoot().c_str()
- ));
+ Y_VERIFY(currentVersion != 0);
+
+ TDbQueriesMaker queryMaker(
+ Cfg().GetRoot(),
+ QueuePath_.UserName,
+ QueuePath_.QueueName,
+ currentVersion,
+ IsFifo_,
+ 0,
+ currentTablesFormat,
+ "", // dlqName
+ 0, // dlqShard
+ 0, // dlqVersion
+ 0 // dlqTablesFormat
+ );
+ auto query = queryMaker.GetMatchQueueAttributesQuery();
+
+ auto ev = MakeExecuteEvent(query);
auto* trans = ev->Record.MutableTransaction()->MutableMiniKQLTransaction();
TParameters(trans->MutableParams()->MutableProto())
+ .Uint64("QUEUE_ID_NUMBER", currentVersion)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(currentVersion))
.Utf8("NAME", IsCloudMode_ ? ExistingQueueResourceId_ : QueuePath_.QueueName)
.Bool("FIFO", IsFifo_)
.Uint64("SHARDS", RequiredShardsCount_)
@@ -1203,7 +1136,7 @@ void TCreateQueueSchemaActorV2::OnAttributesMatch(TSqsEvents::TEvExecuted::TPtr&
resp->ErrorClass = &NErrors::VALIDATION_ERROR;
}
- if (CurrentCreationStep_ == ECreateComponentsStep::DiscoverLeaderTabletId) {
+ if (TablesFormat_ == 0 && CurrentCreationStep_ == ECreateComponentsStep::DiscoverLeaderTabletId) {
// call the special version of cleanup actor
RLOG_SQS_WARN("Removing redundant queue version: " << Version_ << " for queue " <<
QueuePath_.GetQueuePath() << ". Shards: " << RequiredShardsCount_ << " IsFifo: " << IsFifo_);
diff --git a/ydb/core/ymq/actor/queue_schema.h b/ydb/core/ymq/actor/queue_schema.h
index 793ba1c06a..6ff2d5aa06 100644
--- a/ydb/core/ymq/actor/queue_schema.h
+++ b/ydb/core/ymq/actor/queue_schema.h
@@ -74,7 +74,7 @@ public:
void OnCommit(TSqsEvents::TEvExecuted::TPtr& ev);
- void MatchQueueAttributes(const ui64 currentVersion);
+ void MatchQueueAttributes(const ui64 currentVersion, const ui32 currentTablesFormat);
STATEFN(MatchAttributes);
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.cpp b/ydb/core/ymq/queues/common/db_queries_maker.cpp
index 6d3aca8f13..47c16653ff 100644
--- a/ydb/core/ymq/queues/common/db_queries_maker.cpp
+++ b/ydb/core/ymq/queues/common/db_queries_maker.cpp
@@ -1 +1,45 @@
#include "db_queries_maker.h"
+
+#include "queries.h"
+
+
+namespace NKikimr::NSQS {
+
+ TString TDbQueriesMaker::FillQuery(const char* query) const {
+ return Sprintf(
+ query,
+ Root_.c_str(), // 1
+ QueueTablesFolderPerShard_.c_str(), // 2
+ QueueTablesFolder_.c_str(), // 3
+
+ QueueName_.c_str(), // 4
+ GetIdKeys(), // 5
+ GetIdKeysRange(), // 6
+ GetIdAndShardKeys(), // 7
+ GetIdAndShardKeysRange(), // 8
+ GetShardColumnType(TablesFormat_), // 9
+ GetShardColumnName(), // 10
+ GetStateKeys(), // 11
+ GetAttrKeys(), // 12
+ GetAllShardsRange(), // 13
+
+ DlqTablesFolder_.c_str(), // 14
+ DlqTablesFolderPerShard_.c_str(), // 15
+
+ GetDlqIdKeys(), // 16
+ GetDlqIdAndShardKeys(), // 17
+ GetShardColumnType(DlqTablesFormat_), // 18
+ GetDlqStateKeys() // 19
+ );
+ }
+
+ TString TDbQueriesMaker::operator() (EQueryId id) const {
+ return FillQuery(GetQueryById(id));
+ }
+
+ TString TDbQueriesMaker::GetMatchQueueAttributesQuery() const {
+ return FillQuery(MatchQueueAttributesQuery);
+ }
+
+
+} // namespace NKikimr::NSQS \ No newline at end of file
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.h b/ydb/core/ymq/queues/common/db_queries_maker.h
index 5ed4fde06f..e2da48d51d 100644
--- a/ydb/core/ymq/queues/common/db_queries_maker.h
+++ b/ydb/core/ymq/queues/common/db_queries_maker.h
@@ -77,35 +77,10 @@ public:
);
}
- TString operator() (EQueryId id) {
- TString result = Sprintf(
- GetQueryById(id),
- Root_.c_str(), // 1
- QueueTablesFolderPerShard_.c_str(), // 2
- QueueTablesFolder_.c_str(), // 3
-
- QueueName_.c_str(), // 4
- GetIdKeys(), // 5
- GetIdKeysRange(), // 6
- GetIdAndShardKeys(), // 7
- GetIdAndShardKeysRange(), // 8
- GetShardColumnType(TablesFormat_), // 9
- GetShardColumnName(), // 10
- GetStateKeys(), // 11
- GetAttrKeys(), // 12
- GetAllShardsRange(), // 13
-
- DlqTablesFolder_.c_str(), // 14
- DlqTablesFolderPerShard_.c_str(), // 15
-
- GetDlqIdKeys(), // 16
- GetDlqIdAndShardKeys(), // 17
- GetShardColumnType(DlqTablesFormat_), // 18
- GetDlqStateKeys() // 19
- );
- return result;
- }
+ TString operator() (EQueryId id) const;
+ TString GetMatchQueueAttributesQuery() const;
+private:
const char* GetStateKeys() const {
if (TablesFormat_ == 0) {
return IsFifo_ ? "'('State (Uint64 '0))" : "'('State shard)";
@@ -162,7 +137,7 @@ public:
ui64 shard,
TString& tablesFolder,
TString& tablesFolderPerShard
- ) {
+ ) const {
if (tablesFormat == 1) {
tablesFolder = tablesFolderPerShard = TStringBuilder() << Root_ << "/" << (IsFifo_ ? ".FIFO" : ".STD");
} else {
@@ -172,13 +147,14 @@ public:
}
}
-private:
- const char* GetQueryById(EQueryId id) {
+ const char* GetQueryById(EQueryId id) const {
const char* query = IsFifo_ ? GetFifoQueryById(id) : GetStdQueryById(id);
Y_VERIFY(query);
return query;
}
+ TString FillQuery(const char* query) const;
+
private:
TString Root_;
TString QueueTablesFolder_;
diff --git a/ydb/core/ymq/queues/common/queries.cpp b/ydb/core/ymq/queues/common/queries.cpp
index a646821b22..6badd26b62 100644
--- a/ydb/core/ymq/queues/common/queries.cpp
+++ b/ydb/core/ymq/queues/common/queries.cpp
@@ -1,5 +1,7 @@
#include "queries.h"
+#include "db_queries_defs.h"
+
namespace NKikimr::NSQS {
extern const char* const GetQueueParamsQuery = R"__(
@@ -32,4 +34,116 @@ extern const char* const GetQueueParamsQuery = R"__(
)
)__";
+extern const char* const MatchQueueAttributesQuery = R"__(
+ (
+ (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
+ (let name (Parameter 'NAME (DataType 'Utf8String)))
+ (let fifo (Parameter 'FIFO (DataType 'Bool)))
+ (let shards (Parameter 'SHARDS (DataType 'Uint64)))
+ (let partitions (Parameter 'PARTITIONS (DataType 'Uint64)))
+ (let expectedVersion (Parameter 'EXPECTED_VERSION (DataType 'Uint64)))
+ (let maxSize (Parameter 'MAX_SIZE (DataType 'Uint64)))
+ (let delay (Parameter 'DELAY (DataType 'Uint64)))
+ (let visibility (Parameter 'VISIBILITY (DataType 'Uint64)))
+ (let retention (Parameter 'RETENTION (DataType 'Uint64)))
+ (let dlqName (Parameter 'DLQ_TARGET_NAME (DataType 'Utf8String)))
+ (let maxReceiveCount (Parameter 'MAX_RECEIVE_COUNT (DataType 'Uint64)))
+ (let userName (Parameter 'USER_NAME (DataType 'Utf8String)))
+
+ (let attrsTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/Attributes)
+ (let queuesTable ')__" ROOT_PARAM R"__(/.Queues)
+
+ (let queuesRange '(
+ '('Account userName userName)
+ '('QueueName (Utf8String '"") (Void))))
+ (let queues
+ (Member (SelectRange queuesTable queuesRange '('QueueState) '()) 'List))
+
+ (let queuesRow '(
+ '('Account userName)
+ '('QueueName name)))
+ (let queuesSelect '(
+ 'QueueState
+ 'QueueId
+ 'FifoQueue
+ 'Shards
+ 'Partitions
+ 'DlqName
+ 'Version))
+ (let queuesRead (SelectRow queuesTable queuesRow queuesSelect))
+
+ (let queueExists
+ (Coalesce
+ (Or
+ (Equal (Uint64 '1) (Member queuesRead 'QueueState))
+ (Equal (Uint64 '3) (Member queuesRead 'QueueState))
+ )
+ (Bool 'false)))
+
+ (let currentVersion
+ (Coalesce
+ (Member queuesRead 'Version)
+ (Uint64 '0)
+ )
+ )
+
+ (let sameParams
+ (Coalesce
+ (And
+ (And
+ (And (Equal (Member queuesRead 'Shards) shards)
+ (Equal (Member queuesRead 'Partitions) partitions))
+ (Equal (Member queuesRead 'FifoQueue) fifo))
+ (Equal (Coalesce (Member queuesRead 'DlqName) (Utf8String '"")) dlqName))
+ (Bool 'true)))
+
+ (let attrRow '(
+ )__" ATTRS_KEYS_PARAM R"__(
+ ))
+ (let attrSelect '(
+ 'DelaySeconds
+ 'MaximumMessageSize
+ 'MessageRetentionPeriod
+ 'MaxReceiveCount
+ 'VisibilityTimeout))
+ (let attrRead (SelectRow attrsTable attrRow attrSelect))
+
+ (let sameAttributes
+ (Coalesce
+ (And
+ (And
+ (And (Equal (Member attrRead 'DelaySeconds) delay)
+ (And (Equal (Member attrRead 'MaximumMessageSize) maxSize)
+ (Equal (Member attrRead 'MessageRetentionPeriod) retention)))
+ (Equal (Member attrRead 'VisibilityTimeout) visibility))
+ (Equal (Coalesce (Member attrRead 'MaxReceiveCount) (Uint64 '0)) maxReceiveCount))
+ (Bool 'true)))
+
+ (let sameVersion
+ (Equal currentVersion expectedVersion))
+
+ (let isSame
+ (And
+ queueExists
+ (And
+ sameVersion
+ (And
+ sameAttributes
+ sameParams))))
+
+ (let existingQueueId
+ (Coalesce
+ (Member queuesRead 'QueueId)
+ (String '"")))
+
+ (return (AsList
+ (SetResult 'exists queueExists)
+ (SetResult 'sameVersion sameVersion)
+ (SetResult 'id existingQueueId)
+ (SetResult 'isSame isSame)))
+ )
+)__";
+
+
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/queues/common/queries.h b/ydb/core/ymq/queues/common/queries.h
index 4f455213fe..966e74fbc4 100644
--- a/ydb/core/ymq/queues/common/queries.h
+++ b/ydb/core/ymq/queues/common/queries.h
@@ -4,4 +4,6 @@ namespace NKikimr::NSQS {
extern const char* const GetQueueParamsQuery;
+extern const char* const MatchQueueAttributesQuery;
+
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp
index 0ac0360ca3..8b0cc55b39 100644
--- a/ydb/core/ymq/queues/std/queries.cpp
+++ b/ydb/core/ymq/queues/std/queries.cpp
@@ -370,7 +370,6 @@ const char* const DeleteMessageQuery = R"__(
'('Offset (DataType 'Uint64))
'('LockTimestamp (DataType 'Uint64))))))
(let now (Parameter 'NOW (DataType 'Uint64)))
- (let shard (Parameter 'SHARD (DataType 'Uint64)))
(let dataTable ')__" QUEUE_TABLES_FOLDER_PER_SHARD_PARAM R"__(/MessageData)
(let inflyTable ')__" QUEUE_TABLES_FOLDER_PER_SHARD_PARAM R"__(/Infly)
@@ -380,7 +379,9 @@ const char* const DeleteMessageQuery = R"__(
(let records
(MapParameter keys (lambda '(item) (block '(
(let row '(
- '('Offset (Member item 'Offset))))
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
+ '('Offset (Member item 'Offset))
+ ))
(let fields '(
'Offset
'RandomId
@@ -424,23 +425,29 @@ const char* const DeleteMessageQuery = R"__(
(If deleteCond
(Map existed (lambda '(item) (block '(
(let row '(
- '('Offset (Member item 'Offset))))
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
+ '('Offset (Member item 'Offset))
+ ))
(return (EraseRow inflyTable row))))))
(AsList (Void)))
(If deleteCond
(Map existed (lambda '(item) (block '(
(let row '(
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('RandomId (Member item 'RandomId))
- '('Offset (Member item 'Offset))))
+ '('Offset (Member item 'Offset))
+ ))
(return (EraseRow dataTable row))))))
(AsList (Void)))
(If deleteCond
(Map existed (lambda '(item) (block '(
(let row '(
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('SentTimestamp (Member item 'SentTimestamp))
- '('Offset (Member item 'Offset))))
+ '('Offset (Member item 'Offset))
+ ))
(return (EraseRow sentTsIdx row))))))
(AsList (Void)))
))
diff --git a/ydb/tests/functional/sqs/test_multiplexing_tables_format.py b/ydb/tests/functional/sqs/test_multiplexing_tables_format.py
index 3b28a15282..bfc8734958 100644
--- a/ydb/tests/functional/sqs/test_multiplexing_tables_format.py
+++ b/ydb/tests/functional/sqs/test_multiplexing_tables_format.py
@@ -19,7 +19,7 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase):
)
def create_queue(self, is_fifo):
- if is_fifo:
+ if is_fifo and not self.queue_name.endswith('.fifo'):
self.queue_name = self.queue_name + '.fifo'
return self._create_queue_and_assert(self.queue_name, is_fifo=is_fifo)
@@ -78,6 +78,19 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase):
matcher=ReadResponseMatcher().with_message_ids([message_id, ])
)
+ def do_test_double_create(self, is_fifo, tables_format):
+ self._set_new_format_settings(tables_format=tables_format)
+ self.create_queue(is_fifo)
+ self.create_queue(is_fifo)
+
+ @pytest.mark.parametrize(**IS_FIFO_PARAMS)
+ def test_double_create(self, is_fifo):
+ self.do_test_double_create(is_fifo, tables_format='1')
+
+ @pytest.mark.parametrize(**IS_FIFO_PARAMS)
+ def test_double_create_old(self, is_fifo):
+ self.do_test_double_create(is_fifo, tables_format='0')
+
class TestMultiplexingTablesFormatWithTenant(get_test_with_sqs_tenant_installation(MultiplexingTablesFormatTest)):
pass