diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-04-20 13:03:41 +0300 |
---|---|---|
committer | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-04-20 13:03:41 +0300 |
commit | d0fa97cb78792efbce0aff1f2314c324e07e8113 (patch) | |
tree | 099dbdc832474aa9c7cea5ad7f7fafccd3d7748f | |
parent | 24e3379aa601d4d735548ff2b996fdae91ef360b (diff) | |
download | ydb-d0fa97cb78792efbce0aff1f2314c324e07e8113.tar.gz |
sqs: fix retention query
fix retention request
fifo fixed
ref:7c527517687ee91300d2d08fefe0ad9a87e73dd9
-rw-r--r-- | ydb/core/ymq/actor/CMakeLists.txt | 5 | ||||
-rw-r--r-- | ydb/core/ymq/actor/action.h | 5 | ||||
-rw-r--r-- | ydb/core/ymq/actor/fifo_cleanup.cpp | 12 | ||||
-rw-r--r-- | ydb/core/ymq/actor/fifo_cleanup.h | 3 | ||||
-rw-r--r-- | ydb/core/ymq/actor/purge.cpp | 32 | ||||
-rw-r--r-- | ydb/core/ymq/actor/purge.h | 9 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 27 | ||||
-rw-r--r-- | ydb/core/ymq/actor/retention.cpp | 14 | ||||
-rw-r--r-- | ydb/core/ymq/actor/retention.h | 3 | ||||
-rw-r--r-- | ydb/core/ymq/actor/set_queue_attributes.cpp | 5 | ||||
-rw-r--r-- | ydb/core/ymq/base/queue_path.h | 2 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/db_queries_defs.h | 3 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/db_queries_maker.cpp | 13 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/db_queries_maker.h | 15 | ||||
-rw-r--r-- | ydb/core/ymq/queues/fifo/queries.cpp | 12 | ||||
-rw-r--r-- | ydb/core/ymq/queues/std/queries.cpp | 44 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/sqs_test_base.py | 42 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/test_garbage_collection.py | 24 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/test_multiplexing_tables_format.py | 18 |
19 files changed, 224 insertions, 64 deletions
diff --git a/ydb/core/ymq/actor/CMakeLists.txt b/ydb/core/ymq/actor/CMakeLists.txt index 2e9cc34750..5b7f1b8836 100644 --- a/ydb/core/ymq/actor/CMakeLists.txt +++ b/ydb/core/ymq/actor/CMakeLists.txt @@ -110,3 +110,8 @@ generate_enum_serilization(core-ymq-actor INCLUDE_HEADERS ydb/core/ymq/actor/metering.h ) +generate_enum_serilization(core-ymq-actor + ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/fifo_cleanup.h + INCLUDE_HEADERS + ydb/core/ymq/actor/fifo_cleanup.h +) diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h index 4026d5a7f9..ee01dc2528 100644 --- a/ydb/core/ymq/actor/action.h +++ b/ydb/core/ymq/actor/action.h @@ -185,6 +185,11 @@ protected: return *IsFifo_; } + virtual bool TablesFormat() const { + Y_VERIFY(TablesFormat_); + return *TablesFormat_; + } + virtual void DoStart() { } virtual void DoFinish() { } diff --git a/ydb/core/ymq/actor/fifo_cleanup.cpp b/ydb/core/ymq/actor/fifo_cleanup.cpp index 218deb0fcc..5d49c6f904 100644 --- a/ydb/core/ymq/actor/fifo_cleanup.cpp +++ b/ydb/core/ymq/actor/fifo_cleanup.cpp @@ -6,6 +6,7 @@ #include <ydb/public/lib/value/value.h> #include <ydb/core/base/appdata.h> #include <ydb/core/ymq/base/debug_info.h> +#include <ydb/core/ymq/queues/common/key_hashes.h> #include <library/cpp/actors/core/hfunc.h> @@ -13,8 +14,14 @@ namespace NKikimr::NSQS { -TCleanupActor::TCleanupActor(const TQueuePath& queuePath, const TActorId& queueLeader, ECleanupType cleanupType) +TCleanupActor::TCleanupActor( + const TQueuePath& queuePath, + ui32 tablesFormat, + const TActorId& queueLeader, + ECleanupType cleanupType +) : QueuePath_(queuePath) + , TablesFormat_(tablesFormat) , RequestId_(CreateGuidAsString()) , QueueLeader_(queueLeader) , CleanupType(cleanupType) @@ -77,9 +84,12 @@ void TCleanupActor::RunCleanupQuery() { .User(QueuePath_.UserName) .Queue(QueuePath_.QueueName) .QueueLeader(QueueLeader_) + .TablesFormat(TablesFormat_) .QueryId(GetCleanupQueryId()) .RetryOnTimeout() .Params() + .Uint64("QUEUE_ID_NUMBER", QueuePath_.Version) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueuePath_.Version)) .Uint64("NOW", Now().MilliSeconds()) .Uint64("BATCH_SIZE", Cfg().GetCleanupBatchSize()); diff --git a/ydb/core/ymq/actor/fifo_cleanup.h b/ydb/core/ymq/actor/fifo_cleanup.h index edf6040eec..4aa9443c89 100644 --- a/ydb/core/ymq/actor/fifo_cleanup.h +++ b/ydb/core/ymq/actor/fifo_cleanup.h @@ -14,7 +14,7 @@ public: Reads, }; - TCleanupActor(const TQueuePath& queuePath, const TActorId& queueLeader, ECleanupType cleanupType); + TCleanupActor(const TQueuePath& queuePath, ui32 tablesFormat, const TActorId& queueLeader, ECleanupType cleanupType); ~TCleanupActor(); void Bootstrap(); @@ -39,6 +39,7 @@ private: private: const TQueuePath QueuePath_; + const ui32 TablesFormat_; const TString RequestId_; const TActorId QueueLeader_; const ECleanupType CleanupType; diff --git a/ydb/core/ymq/actor/purge.cpp b/ydb/core/ymq/actor/purge.cpp index aff713d784..8000b00665 100644 --- a/ydb/core/ymq/actor/purge.cpp +++ b/ydb/core/ymq/actor/purge.cpp @@ -8,13 +8,21 @@ #include <ydb/core/ymq/base/counters.h> #include <ydb/core/ymq/base/debug_info.h> #include <ydb/core/ymq/base/query_id.h> +#include <ydb/core/ymq/queues/common/key_hashes.h> using NKikimr::NClient::TValue; namespace NKikimr::NSQS { -TPurgeActor::TPurgeActor(const TQueuePath& queuePath, TIntrusivePtr<TQueueCounters> counters, const TActorId& queueLeader, bool isFifo) +TPurgeActor::TPurgeActor( + const TQueuePath& queuePath, + ui32 tablesFormat, + TIntrusivePtr<TQueueCounters> counters, + const TActorId& queueLeader, + bool isFifo +) : QueuePath_(queuePath) + , TablesFormat_(tablesFormat) , RequestId_(CreateGuidAsString()) , Counters_(std::move(counters)) , QueueLeader_(queueLeader) @@ -63,11 +71,16 @@ void TPurgeActor::MakeGetRetentionOffsetRequest(const ui64 shardId, TShard* shar .Queue(QueuePath_.QueueName) .Shard(shardId) .QueueLeader(QueueLeader_) + .TablesFormat(TablesFormat_) .QueryId(GET_RETENTION_OFFSET_ID) .Counters(Counters_) .RetryOnTimeout() .OnExecuted(onExecuted) .Params() + .Uint64("QUEUE_ID_NUMBER", QueuePath_.Version) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueuePath_.Version)) + .AddWithType("SHARD", shardId, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) + .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueuePath_.Version, shardId)) .Uint64("OFFSET_FROM", shard->PreviousSuccessfullyProcessedLastMessage.Offset) .Uint64("TIME_FROM", shard->PreviousSuccessfullyProcessedLastMessage.SentTimestamp.MilliSeconds()) .Uint64("TIME_TO", boundary.MilliSeconds()) @@ -107,15 +120,19 @@ void TPurgeActor::MakeStage1Request(const ui64 shardId, TShard* shard, const std .Queue(QueuePath_.QueueName) .Shard(shardId) .QueueLeader(QueueLeader_) + .TablesFormat(TablesFormat_) .QueryId(PURGE_QUEUE_ID) .Counters(Counters_) .RetryOnTimeout() .OnExecuted(onExecuted) .Params() + .Uint64("QUEUE_ID_NUMBER", QueuePath_.Version) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueuePath_.Version)) + .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueuePath_.Version, shardId)) .Uint64("OFFSET_FROM", offsets.first) .Uint64("OFFSET_TO", offsets.second) .Uint64("NOW", Now().MilliSeconds()) - .Uint64("SHARD", shardId) + .AddWithType("SHARD", shardId, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) .Uint64("BATCH_SIZE", Cfg().GetCleanupBatchSize()) .ParentBuilder().Start(); } @@ -180,15 +197,24 @@ void TPurgeActor::MakeStage2Request(ui64 cleanupVersion, const TValue& messages, .Queue(QueuePath_.QueueName) .Shard(shardId) .QueueLeader(QueueLeader_) + .TablesFormat(TablesFormat_) .QueryId(PURGE_QUEUE_STAGE2_ID) .Counters(Counters_) .RetryOnTimeout() .OnExecuted(onExecuted); NClient::TWriteValue params = builder.ParamsValue(); + + params["QUEUE_ID_NUMBER"] = QueuePath_.Version; + params["QUEUE_ID_NUMBER_HASH"] = GetKeysHash(QueuePath_.Version); + params["QUEUE_ID_NUMBER_AND_SHARD_HASH"] = GetKeysHash(QueuePath_.Version, shardId); params["CLEANUP_VERSION"] = cleanupVersion; - params["SHARD"] = shardId; params["NOW"] = TActivationContext::Now().MilliSeconds(); + if (TablesFormat_ == 0) { + params["SHARD"] = shardId; + } else { + params["SHARD"] = static_cast<ui32>(shardId); + } auto messagesParam = params["MESSAGES"]; FillMessagesParam(messagesParam, messages, shard->CurrentLastMessage.Offset, shard->CurrentLastMessage.SentTimestamp); diff --git a/ydb/core/ymq/actor/purge.h b/ydb/core/ymq/actor/purge.h index 2a16a5aa24..ac9d191d04 100644 --- a/ydb/core/ymq/actor/purge.h +++ b/ydb/core/ymq/actor/purge.h @@ -27,7 +27,13 @@ class TPurgeActor : public TActorBootstrapped<TPurgeActor> { }; public: - TPurgeActor(const TQueuePath& queuePath, TIntrusivePtr<TQueueCounters> counters, const TActorId& queueLeader, bool isFifo); + TPurgeActor( + const TQueuePath& queuePath, + ui32 tablesFormat, + TIntrusivePtr<TQueueCounters> counters, + const TActorId& queueLeader, + bool isFifo + ); ~TPurgeActor(); void Bootstrap(); @@ -56,6 +62,7 @@ private: private: const TQueuePath QueuePath_; + const ui32 TablesFormat_; /// A state of shard processing TMap<ui64, TShard> Shards_; const TString RequestId_; diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index 250731b0df..d9f79471f7 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -1135,6 +1135,10 @@ void TQueueLeader::ProcessChangeMessageVisibilityBatch(TChangeMessageVisibilityB .OnExecuted([this, requestId = req.RequestId, shard = req.Shard](const TSqsEvents::TEvExecuted::TRecord& ev) { OnVisibilityChanged(requestId, shard, ev); }); builder.Params() + .Uint64("QUEUE_ID_NUMBER", QueueVersion_) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_)) + .AddWithType("SHARD", req.Shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) + .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, req.Shard)) .Uint64("NOW", req.NowTimestamp.MilliSeconds()) .Uint64("GROUPS_READ_ATTEMPT_IDS_PERIOD", Cfg().GetGroupsReadAttemptIdsPeriodMs()); NClient::TWriteValue params = builder.ParamsValue(); @@ -1515,6 +1519,7 @@ void TQueueLeader::RequestMessagesCountMetrics(ui64 shard) { .Params() .Uint64("QUEUE_ID_NUMBER", QueueVersion_) .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_)) + .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard)) .AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) .ParentBuilder().Start(); ++MetricsQueriesInfly_; @@ -1540,8 +1545,10 @@ void TQueueLeader::RequestOldestTimestampMetrics(ui64 shard) { .OnExecuted([this, shard](const TSqsEvents::TEvExecuted::TRecord& ev) { ReceiveOldestTimestampMetrics(shard, ev); }) .Counters(Counters_) .Params() - .Uint64("QUEUE_ID_QUEUE_ID_NUMBER", QueueVersion_) + .Uint64("QUEUE_ID_NUMBER", QueueVersion_) .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_)) + .AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) + .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard)) .Uint64("TIME_FROM", Shards_[shard].LastSuccessfulOldestMessageTimestampValueMs) // optimization for accurate range selection // timestamp is always nondecreasing .ParentBuilder().Start(); ++MetricsQueriesInfly_; @@ -1665,21 +1672,27 @@ void TQueueLeader::CreateBackgroundActors() { } if (IsFifoQueue_) { + auto createCleaner = [&](TCleanupActor::ECleanupType type) { + auto actor = Register(new TCleanupActor(GetQueuePath(), TablesFormat_, SelfId(), type)); + LOG_SQS_DEBUG( + "Created new " << type << " cleanup actor for queue " << TLogQueueName(UserName_, QueueName_) + << ". Actor id: " << actor + ); + return actor; + }; if (!DeduplicationCleanupActor_) { - DeduplicationCleanupActor_ = Register(new TCleanupActor(GetQueuePath(), SelfId(), TCleanupActor::ECleanupType::Deduplication)); - LOG_SQS_DEBUG("Created new deduplication cleanup actor for queue " << TLogQueueName(UserName_, QueueName_) << ". Actor id: " << DeduplicationCleanupActor_); + DeduplicationCleanupActor_ = createCleaner(TCleanupActor::ECleanupType::Deduplication); } if (!ReadsCleanupActor_) { - ReadsCleanupActor_ = Register(new TCleanupActor(GetQueuePath(), SelfId(), TCleanupActor::ECleanupType::Reads)); - LOG_SQS_DEBUG("Created new reads cleanup actor for queue " << TLogQueueName(UserName_, QueueName_) << ". Actor id: " << ReadsCleanupActor_); + ReadsCleanupActor_ = createCleaner(TCleanupActor::ECleanupType::Reads); } } if (!RetentionActor_) { - RetentionActor_ = Register(new TRetentionActor(GetQueuePath(), SelfId())); + RetentionActor_ = Register(new TRetentionActor(GetQueuePath(), TablesFormat_, SelfId())); LOG_SQS_DEBUG("Created new retention actor for queue " << TLogQueueName(UserName_, QueueName_) << ". Actor id: " << RetentionActor_); } if (!PurgeActor_) { - PurgeActor_ = Register(new TPurgeActor(GetQueuePath(), Counters_, SelfId(), IsFifoQueue_)); + PurgeActor_ = Register(new TPurgeActor(GetQueuePath(), TablesFormat_, Counters_, SelfId(), IsFifoQueue_)); LOG_SQS_DEBUG("Created new purge actor for queue " << TLogQueueName(UserName_, QueueName_) << ". Actor id: " << PurgeActor_); } } diff --git a/ydb/core/ymq/actor/retention.cpp b/ydb/core/ymq/actor/retention.cpp index 9e610e8ed2..d5a0995559 100644 --- a/ydb/core/ymq/actor/retention.cpp +++ b/ydb/core/ymq/actor/retention.cpp @@ -6,14 +6,16 @@ #include <ydb/public/lib/value/value.h> #include <ydb/core/base/appdata.h> #include <ydb/core/ymq/base/debug_info.h> +#include <ydb/core/ymq/queues/common/key_hashes.h> #include <library/cpp/actors/core/hfunc.h> #include <util/random/random.h> namespace NKikimr::NSQS { -TRetentionActor::TRetentionActor(const TQueuePath& queuePath, const TActorId& queueLeader) +TRetentionActor::TRetentionActor(const TQueuePath& queuePath, ui32 tablesFormat, const TActorId& queueLeader) : QueuePath_(queuePath) + , TablesFormat_(tablesFormat) , RequestId_(CreateGuidAsString()) , QueueLeader_(queueLeader) { @@ -49,8 +51,11 @@ void TRetentionActor::SetRetentionBoundary() { auto req = MakeHolder<TSqsEvents::TEvPurgeQueue>(); req->QueuePath = QueuePath_; req->Boundary = TInstant::MilliSeconds(ui64(list[i]["RetentionBoundary"])); - req->Shard = ui64(list[i]["Shard"]); - + if (TablesFormat_ == 0) { + req->Shard = list[i]["Shard"]; + } else { + req->Shard = static_cast<ui32>(list[i]["Shard"]); + } RLOG_SQS_INFO("Set retention boundary for queue " << TLogQueueName(QueuePath_, req->Shard) << " to " << req->Boundary.MilliSeconds() << " (" << req->Boundary << ")"); Send(QueueLeader_, std::move(req)); @@ -66,10 +71,13 @@ void TRetentionActor::SetRetentionBoundary() { .User(QueuePath_.UserName) .Queue(QueuePath_.QueueName) .QueueLeader(QueueLeader_) + .TablesFormat(TablesFormat_) .QueryId(SET_RETENTION_ID) .RetryOnTimeout() .OnExecuted(onExecuted) .Params() + .Uint64("QUEUE_ID_NUMBER", QueuePath_.Version) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueuePath_.Version)) .Uint64("NOW", Now().MilliSeconds()) .Bool("PURGE", false) .ParentBuilder().Start(); diff --git a/ydb/core/ymq/actor/retention.h b/ydb/core/ymq/actor/retention.h index 724baddff3..e2413fafdc 100644 --- a/ydb/core/ymq/actor/retention.h +++ b/ydb/core/ymq/actor/retention.h @@ -9,7 +9,7 @@ namespace NKikimr::NSQS { class TRetentionActor : public TActorBootstrapped<TRetentionActor> { public: - TRetentionActor(const TQueuePath& queuePath, const TActorId& queueLeader); + TRetentionActor(const TQueuePath& queuePath, ui32 tablesFormat, const TActorId& queueLeader); ~TRetentionActor(); void Bootstrap(); @@ -32,6 +32,7 @@ private: private: const TQueuePath QueuePath_; + const ui32 TablesFormat_; const TString RequestId_; const TActorId QueueLeader_; }; diff --git a/ydb/core/ymq/actor/set_queue_attributes.cpp b/ydb/core/ymq/actor/set_queue_attributes.cpp index df33ad99e6..492a2021a1 100644 --- a/ydb/core/ymq/actor/set_queue_attributes.cpp +++ b/ydb/core/ymq/actor/set_queue_attributes.cpp @@ -8,6 +8,7 @@ #include <ydb/core/ymq/base/limits.h> #include <ydb/core/ymq/base/dlq_helpers.h> #include <ydb/core/ymq/base/queue_attributes.h> +#include <ydb/core/ymq/queues/common/key_hashes.h> #include <ydb/public/lib/value/value.h> #include <library/cpp/scheme/scheme.h> @@ -71,10 +72,14 @@ private: .User(UserName_) .Queue(GetQueueName()) .QueueLeader(QueueLeader_) + .TablesFormat(TablesFormat()) .QueryId(SET_QUEUE_ATTRIBUTES_ID) .Counters(QueueCounters_) .RetryOnTimeout(); + builder.Params().Uint64("QUEUE_ID_NUMBER", QueueVersion_.GetRef()); + builder.Params().Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_.GetRef())); + builder.Params().OptionalUint64("MAX_RECEIVE_COUNT", ValidatedAttributes_.RedrivePolicy.MaxReceiveCount); builder.Params().OptionalUtf8("DLQ_TARGET_ARN", ValidatedAttributes_.RedrivePolicy.TargetArn); builder.Params().OptionalUtf8("DLQ_TARGET_NAME", ValidatedAttributes_.RedrivePolicy.TargetQueueName); diff --git a/ydb/core/ymq/base/queue_path.h b/ydb/core/ymq/base/queue_path.h index baaa781b00..c8154c245b 100644 --- a/ydb/core/ymq/base/queue_path.h +++ b/ydb/core/ymq/base/queue_path.h @@ -8,6 +8,7 @@ struct TQueuePath { TString Root; TString UserName; TString QueueName; + ui64 Version; TString VersionSuffix; TQueuePath() @@ -20,6 +21,7 @@ struct TQueuePath { : Root(root) , UserName(userName) , QueueName(queueName) + , Version(version) { if (version) { VersionSuffix = TString::Join("v", ToString(version)); diff --git a/ydb/core/ymq/queues/common/db_queries_defs.h b/ydb/core/ymq/queues/common/db_queries_defs.h index 4ec1ebb8a0..8179ef96ec 100644 --- a/ydb/core/ymq/queues/common/db_queries_defs.h +++ b/ydb/core/ymq/queues/common/db_queries_defs.h @@ -23,3 +23,6 @@ #define DLQ_ID_AND_SHARD_KEYS_PARAM "%17$s"
#define DLQ_SHARD_TYPE_PARAM "%18$s"
#define DLQ_STATE_KEYS_PARAM "%19$s"
+
+#define SELECT_QUEUE_AND_SHARD_HASH_PARAM "%20$s"
+#define LOAD_QUEUE_AND_SHARD_HASH_OR_ZERO_PARAM "%21$s"
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.cpp b/ydb/core/ymq/queues/common/db_queries_maker.cpp index 47c16653ff..83915a8039 100644 --- a/ydb/core/ymq/queues/common/db_queries_maker.cpp +++ b/ydb/core/ymq/queues/common/db_queries_maker.cpp @@ -5,6 +5,14 @@ namespace NKikimr::NSQS {
+ const char* TDbQueriesMaker::GetSelectQueueAndShardHash() const {
+ return TablesFormat_ == 1 ? "'QueueIdNumberAndShardHash" : "";
+ }
+
+ const char* TDbQueriesMaker::GetLoadQueueAndShardHashOrZero() const {
+ return TablesFormat_ == 1 ? "Member item 'QueueIdNumberAndShardHash" : "Uint64 '0";
+ }
+
TString TDbQueriesMaker::FillQuery(const char* query) const {
return Sprintf(
query,
@@ -29,7 +37,10 @@ namespace NKikimr::NSQS { GetDlqIdKeys(), // 16
GetDlqIdAndShardKeys(), // 17
GetShardColumnType(DlqTablesFormat_), // 18
- GetDlqStateKeys() // 19
+ GetDlqStateKeys(), // 19
+
+ GetSelectQueueAndShardHash(), // 20
+ GetLoadQueueAndShardHashOrZero() // 21
);
}
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.h b/ydb/core/ymq/queues/common/db_queries_maker.h index e2da48d51d..e4514a0160 100644 --- a/ydb/core/ymq/queues/common/db_queries_maker.h +++ b/ydb/core/ymq/queues/common/db_queries_maker.h @@ -97,7 +97,17 @@ private: const char* GetAllShardsRange() const {
if (TablesFormat_ == 1) {
- return "'('QueueIdNumber queueIdNumber queueIdNumber)";
+ if (IsFifo_) {
+ return R"__(
+ '('QueueIdNumberHash (Uint64 '0) (Uint64 '18446744073709551615))
+ '('QueueIdNumber queueIdNumber queueIdNumber)
+ )__";
+ }
+ return R"__(
+ '('QueueIdNumberAndShardHash (Uint64 '0) (Uint64 '18446744073709551615))
+ '('QueueIdNumber queueIdNumber queueIdNumber)
+ '('Shard (Uint32 '0) (Uint32 '4294967295))
+ )__";
}
return "'('State (Uint64 '0) (Uint64 '18446744073709551615))";
}
@@ -129,6 +139,9 @@ private: return tablesFormat == 1 ? "Uint32" : "Uint64";
}
+ const char* GetSelectQueueAndShardHash() const;
+ const char* GetLoadQueueAndShardHashOrZero() const;
+
void FillQueueVars(
const TString& userName,
const TString& queueName,
diff --git a/ydb/core/ymq/queues/fifo/queries.cpp b/ydb/core/ymq/queues/fifo/queries.cpp index 4dd8494c76..0d5c5239e2 100644 --- a/ydb/core/ymq/queues/fifo/queries.cpp +++ b/ydb/core/ymq/queues/fifo/queries.cpp @@ -1253,7 +1253,10 @@ static const char* const DeduplicationCleanupQuery = R"__( (Map dedupToErase (lambda '(item) (block '( (return (EraseRow dedupTable '( - '('DedupId (Member item 'DedupId))))))))) + )__" QUEUE_ID_KEYS_PARAM R"__( + '('DedupId (Member item 'DedupId))) + )) + )))) )) ) )__"; @@ -1290,7 +1293,10 @@ static const char* const ReadsCleanupQuery = R"__( (Map readsToErase (lambda '(item) (block '( (return (EraseRow readsTable '( - '('ReceiveAttemptId (Member item 'ReceiveAttemptId))))))))) + )__" QUEUE_ID_KEYS_PARAM R"__( + '('ReceiveAttemptId (Member item 'ReceiveAttemptId)) + ))) + )))) )) ) )__"; @@ -1331,7 +1337,7 @@ const char* const SetRetentionQuery = R"__( (Bool 'false))) (return (AsStruct - '('Shard (Uint64 '0)) + '('Shard ()__" SHARD_TYPE_PARAM R"__( '0)) '('RetentionBoundary (Max boundary (Member item 'RetentionBoundary))) '('Updated updated)))))))) diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp index 8b0cc55b39..23ee4ab5d6 100644 --- a/ydb/core/ymq/queues/std/queries.cpp +++ b/ydb/core/ymq/queues/std/queries.cpp @@ -201,9 +201,13 @@ const char* const PurgeQueueQuery = R"__( (let modifiedTimestamp (Max now (Member stateRead 'LastModifiedTimestamp))) (let messageRange '( - '('Offset offsetFrom offsetTo))) + )__" QUEUE_ID_AND_SHARD_KEYS_RANGE_PARAM R"__( + '('Offset offsetFrom offsetTo) + )) (let inflyRange '( - '('Offset offsetFrom offsetTo))) + )__" QUEUE_ID_AND_SHARD_KEYS_RANGE_PARAM R"__( + '('Offset offsetFrom offsetTo) + )) (let messageSelect '( 'SentTimestamp 'Offset @@ -239,7 +243,6 @@ const char* const PurgeQueueStage2Query = R"__( (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) - (let shard (Parameter 'SHARD (DataType 'Uint64))) (let cleanupVersion (Parameter 'CLEANUP_VERSION (DataType 'Uint64))) (let now (Parameter 'NOW (DataType 'Uint64))) (let messages (Parameter 'MESSAGES @@ -271,6 +274,7 @@ const char* const PurgeQueueStage2Query = R"__( (let inflyRecords (MapParameter messages (lambda '(item) (block '( (let row '( + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('Offset (Member item 'Offset)))) (let fields '( 'Offset @@ -285,6 +289,7 @@ const char* const PurgeQueueStage2Query = R"__( (let messageRecords (MapParameter messages (lambda '(item) (block '( (let row '( + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('Offset (Member item 'Offset)))) (let fields '( 'Offset @@ -332,27 +337,39 @@ const char* const PurgeQueueStage2Query = R"__( (If versionIsSame (Map inflyRecordsExisted (lambda '(item) (block '( (return (EraseRow inflyTable '( - '('Offset (Member item 'Offset))))))))) + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( + '('Offset (Member item 'Offset)) + ))) + )))) (AsList (Void))) (If versionIsSame (Map messages (lambda '(item) (block '( (return (EraseRow dataTable '( + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('RandomId (Member item 'RandomId)) - '('Offset (Member item 'Offset))))))))) + '('Offset (Member item 'Offset)) + ))) + )))) (AsList (Void))) (If versionIsSame (Map messageRecordsExisted (lambda '(item) (block '( (return (EraseRow msgTable '( - '('Offset (Member item 'Offset))))))))) + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( + '('Offset (Member item 'Offset)) + ))) + )))) (AsList (Void))) (If versionIsSame (Map messages (lambda '(item) (block '( (return (EraseRow sentTsIdx '( + )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('SentTimestamp (Member item 'SentTimestamp)) - '('Offset (Member item 'Offset))))))))) + '('Offset (Member item 'Offset)) + ))) + )))) (AsList (Void))) )) ) @@ -999,11 +1016,10 @@ const char* const WriteMessageQuery = R"__( ) )__"; -// range more slow??? -// check column renames test const char* const SetRetentionQuery = R"__( ( - (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) (let now (Parameter 'NOW (DataType 'Uint64))) (let purge (Parameter 'PURGE (DataType 'Bool))) @@ -1025,7 +1041,9 @@ const char* const SetRetentionQuery = R"__( )) (let fields '( ')__" SHARD_COLUMN_NAME_PARAM R"__( - 'RetentionBoundary)) + )__" SELECT_QUEUE_AND_SHARD_HASH_PARAM R"__( + 'RetentionBoundary + )) (let records (Member (SelectRange stateTable range fields '()) 'List)) (let result @@ -1039,6 +1057,9 @@ const char* const SetRetentionQuery = R"__( '('Shard ( Member item ')__" SHARD_COLUMN_NAME_PARAM R"__( )) + '('QueueIdNumberAndShardHash ( + )__" LOAD_QUEUE_AND_SHARD_HASH_OR_ZERO_PARAM R"__( + )) '('RetentionBoundary (Max boundary (Member item 'RetentionBoundary))) '('Updated updated)))))))) @@ -1051,6 +1072,7 @@ const char* const SetRetentionQuery = R"__( (Map updated (lambda '(item) (block '( (let shard (Member item 'Shard)) + (let queueIdNumberAndShardHash (Member item 'QueueIdNumberAndShardHash)) (let row '( )__" STATE_KEYS_PARAM R"__( )) diff --git a/ydb/tests/functional/sqs/sqs_test_base.py b/ydb/tests/functional/sqs/sqs_test_base.py index 05d5303f64..de6623b887 100644 --- a/ydb/tests/functional/sqs/sqs_test_base.py +++ b/ydb/tests/functional/sqs/sqs_test_base.py @@ -35,6 +35,12 @@ IS_FIFO_PARAMS = { 'ids': ['fifo', 'std'], } +TABLES_FORMAT_PARAMS = { + 'argnames': 'tables_format', + 'argvalues': [0, 1], + 'ids': ['tables_format_v0', 'tables_format_v1'], +} + POLLING_PARAMS = { 'argnames': 'polling_wait_timeout', 'argvalues': [0, 1], @@ -144,6 +150,7 @@ class KikimrSqsTestBase(object): database = '/Root' sqs_root = '/Root/SQS' use_in_memory_pdisks = True + tables_format_per_user = {} @classmethod def setup_class(cls): @@ -627,14 +634,18 @@ class KikimrSqsTestBase(object): time.sleep(1) # wait node to start def _smart_make_table_path(self, user_name, queue_name, queue_version, shard, table_name): - table_path = '{}/{}'.format(self.sqs_root, user_name) - if queue_name is not None: - table_path += '/{}'.format(queue_name) - if queue_version is not None and queue_version != 0: - table_path += '/v{}'.format(queue_version) - if shard is not None: - table_path += '/{}'.format(shard) - + tables_format = self.tables_format_per_user.get(user_name, 0) + table_path = self.sqs_root + if tables_format == 0: + table_path += '/{}'.format(user_name) + if queue_name is not None: + table_path += '/{}'.format(queue_name) + if queue_version is not None and queue_version != 0: + table_path += '/v{}'.format(queue_version) + if shard is not None: + table_path += '/{}'.format(shard) + else: + table_path += '/{}'.format('.FIFO' if queue_name.endswith('.fifo') else '.STD') return table_path + '/{}'.format(table_name) def _get_queue_version_number(self, user_name, queue_name): @@ -751,3 +762,18 @@ class KikimrSqsTestBase(object): for shard in range(shards): session.drop_table(self._smart_make_table_path(username, queuename, version, shard, 'Messages')) session.drop_table(self._smart_make_table_path(username, queuename, version, shard, 'MessageData')) + + def _set_tables_format(self, username=None, tables_format=1): + if username is None: + username = self._username + self._execute_yql_query( + f'UPSERT INTO `{self.sqs_root}/.Settings` (Account, Name, Value) \ + VALUES ("{username}", "CreateQueuesWithTabletFormat", "{tables_format}")' + ) + self.tables_format_per_user[username] = tables_format + + def _init_with_params(self, is_fifo=None, tables_format=None): + if is_fifo and not self.queue_name.endswith('.fifo'): + self.queue_name += '.fifo' + if tables_format is not None: + self._set_tables_format(tables_format=tables_format) diff --git a/ydb/tests/functional/sqs/test_garbage_collection.py b/ydb/tests/functional/sqs/test_garbage_collection.py index 992c8a6513..aff51e3bcd 100644 --- a/ydb/tests/functional/sqs/test_garbage_collection.py +++ b/ydb/tests/functional/sqs/test_garbage_collection.py @@ -13,7 +13,7 @@ from sqs_requests_client import SqsHttpApi from sqs_matchers import ReadResponseMatcher from sqs_test_base import to_bytes -from sqs_test_base import KikimrSqsTestBase, VISIBILITY_CHANGE_METHOD_PARAMS, IS_FIFO_PARAMS +from sqs_test_base import KikimrSqsTestBase, VISIBILITY_CHANGE_METHOD_PARAMS, IS_FIFO_PARAMS, TABLES_FORMAT_PARAMS def send_message(server, username, queue_url, sqs_port, body, seq_no, group_id): @@ -113,9 +113,10 @@ class TestSqsGarbageCollection(KikimrSqsTestBase): break @pytest.mark.parametrize(**IS_FIFO_PARAMS) - def test_removes_messages_by_retention_time(self, is_fifo): - if is_fifo: - self.queue_name = self.queue_name + '.fifo' + @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) + def test_removes_messages_by_retention_time(self, is_fifo, tables_format): + self._init_with_params(is_fifo, tables_format) + self._create_queue_and_assert(self.queue_name, is_fifo=is_fifo) number_of_mesages_to_write = 110 if is_fifo else 220 @@ -166,8 +167,9 @@ class TestSqsGarbageCollection(KikimrSqsTestBase): assert_that(number_of_messages, equal_to(0)) self._check_queue_tables_are_empty() - def test_cleanups_deduplication_table(self): - self.queue_name = self.queue_name + '.fifo' + @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) + def test_cleanups_deduplication_table(self, tables_format): + self._init_with_params(is_fifo=True, tables_format=tables_format) self._create_queue_and_assert(self.queue_name, is_fifo=True) # do the same again to ensure that this process will not stop @@ -177,8 +179,9 @@ class TestSqsGarbageCollection(KikimrSqsTestBase): self.wait_fifo_table_empty('Deduplication') @pytest.mark.parametrize('random_groups_max_count', [30, 200]) - def test_cleanups_reads_table(self, random_groups_max_count): - self.queue_name = self.queue_name + '.fifo' + @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) + def test_cleanups_reads_table(self, random_groups_max_count, tables_format): + self._init_with_params(is_fifo=True, tables_format=tables_format) self._create_queue_and_assert(self.queue_name, is_fifo=True) # do the same again to ensure that this process will not stop @@ -194,8 +197,9 @@ class TestSqsGarbageCollection(KikimrSqsTestBase): self.delete_messages(self.queue_url, [r['ReceiptHandle'] for r in read_result]) @pytest.mark.parametrize(**VISIBILITY_CHANGE_METHOD_PARAMS) - def test_visibility_change_cleanups_proper_receive_attempt_id(self, delete_message): - self.queue_name = self.queue_name + '.fifo' + @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) + def test_visibility_change_cleanups_proper_receive_attempt_id(self, delete_message, tables_format): + self._init_with_params(is_fifo=True, tables_format=tables_format) queue_url = self._create_queue_and_assert(self.queue_name, is_fifo=True) groups_count = 5 for group_number in range(groups_count): diff --git a/ydb/tests/functional/sqs/test_multiplexing_tables_format.py b/ydb/tests/functional/sqs/test_multiplexing_tables_format.py index bfc8734958..e98e0c8dd0 100644 --- a/ydb/tests/functional/sqs/test_multiplexing_tables_format.py +++ b/ydb/tests/functional/sqs/test_multiplexing_tables_format.py @@ -10,14 +10,6 @@ from sqs_matchers import ReadResponseMatcher class MultiplexingTablesFormatTest(KikimrSqsTestBase): - def _set_new_format_settings(self, username=None, tables_format=1): - if username is None: - username = self._username - self._execute_yql_query( - f'UPSERT INTO `{self.sqs_root}/.Settings` (Account, Name, Value) \ - VALUES ("{username}", "CreateQueuesWithTabletFormat", "{tables_format}")' - ) - def create_queue(self, is_fifo): if is_fifo and not self.queue_name.endswith('.fifo'): self.queue_name = self.queue_name + '.fifo' @@ -31,13 +23,13 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase): assert(False) def create_queue_with_wrong_tables_format(self, tables_format): - self._set_new_format_settings(tables_format='qwerty') + self._set_tables_format(tables_format='qwerty') self.create_queue_must_fail(True) self.create_queue_must_fail(False) @pytest.mark.parametrize(**IS_FIFO_PARAMS) def test_create_queue(self, is_fifo): - self._set_new_format_settings() + self._set_tables_format() self.create_queue(is_fifo) def test_create_queue_with_incorrect_tables_format(self): @@ -51,7 +43,7 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase): @pytest.mark.parametrize(**IS_FIFO_PARAMS) def test_send_message(self, is_fifo): - self._set_new_format_settings() + self._set_tables_format() created_queue_url = self.create_queue(is_fifo) self._send_message_and_assert( created_queue_url, @@ -62,7 +54,7 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase): @pytest.mark.parametrize(**IS_FIFO_PARAMS) def test_read_message(self, is_fifo): - self._set_new_format_settings() + self._set_tables_format() created_queue_url = self.create_queue(is_fifo) self.seq_no += 1 message_id = self._send_message_and_assert( @@ -79,7 +71,7 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase): ) def do_test_double_create(self, is_fifo, tables_format): - self._set_new_format_settings(tables_format=tables_format) + self._set_tables_format(tables_format=tables_format) self.create_queue(is_fifo) self.create_queue(is_fifo) |