aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-04-20 13:03:41 +0300
committerAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-04-20 13:03:41 +0300
commitd0fa97cb78792efbce0aff1f2314c324e07e8113 (patch)
tree099dbdc832474aa9c7cea5ad7f7fafccd3d7748f
parent24e3379aa601d4d735548ff2b996fdae91ef360b (diff)
downloadydb-d0fa97cb78792efbce0aff1f2314c324e07e8113.tar.gz
sqs: fix retention query
fix retention request fifo fixed ref:7c527517687ee91300d2d08fefe0ad9a87e73dd9
-rw-r--r--ydb/core/ymq/actor/CMakeLists.txt5
-rw-r--r--ydb/core/ymq/actor/action.h5
-rw-r--r--ydb/core/ymq/actor/fifo_cleanup.cpp12
-rw-r--r--ydb/core/ymq/actor/fifo_cleanup.h3
-rw-r--r--ydb/core/ymq/actor/purge.cpp32
-rw-r--r--ydb/core/ymq/actor/purge.h9
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp27
-rw-r--r--ydb/core/ymq/actor/retention.cpp14
-rw-r--r--ydb/core/ymq/actor/retention.h3
-rw-r--r--ydb/core/ymq/actor/set_queue_attributes.cpp5
-rw-r--r--ydb/core/ymq/base/queue_path.h2
-rw-r--r--ydb/core/ymq/queues/common/db_queries_defs.h3
-rw-r--r--ydb/core/ymq/queues/common/db_queries_maker.cpp13
-rw-r--r--ydb/core/ymq/queues/common/db_queries_maker.h15
-rw-r--r--ydb/core/ymq/queues/fifo/queries.cpp12
-rw-r--r--ydb/core/ymq/queues/std/queries.cpp44
-rw-r--r--ydb/tests/functional/sqs/sqs_test_base.py42
-rw-r--r--ydb/tests/functional/sqs/test_garbage_collection.py24
-rw-r--r--ydb/tests/functional/sqs/test_multiplexing_tables_format.py18
19 files changed, 224 insertions, 64 deletions
diff --git a/ydb/core/ymq/actor/CMakeLists.txt b/ydb/core/ymq/actor/CMakeLists.txt
index 2e9cc34750..5b7f1b8836 100644
--- a/ydb/core/ymq/actor/CMakeLists.txt
+++ b/ydb/core/ymq/actor/CMakeLists.txt
@@ -110,3 +110,8 @@ generate_enum_serilization(core-ymq-actor
INCLUDE_HEADERS
ydb/core/ymq/actor/metering.h
)
+generate_enum_serilization(core-ymq-actor
+ ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/fifo_cleanup.h
+ INCLUDE_HEADERS
+ ydb/core/ymq/actor/fifo_cleanup.h
+)
diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h
index 4026d5a7f9..ee01dc2528 100644
--- a/ydb/core/ymq/actor/action.h
+++ b/ydb/core/ymq/actor/action.h
@@ -185,6 +185,11 @@ protected:
return *IsFifo_;
}
+ virtual bool TablesFormat() const {
+ Y_VERIFY(TablesFormat_);
+ return *TablesFormat_;
+ }
+
virtual void DoStart() { }
virtual void DoFinish() { }
diff --git a/ydb/core/ymq/actor/fifo_cleanup.cpp b/ydb/core/ymq/actor/fifo_cleanup.cpp
index 218deb0fcc..5d49c6f904 100644
--- a/ydb/core/ymq/actor/fifo_cleanup.cpp
+++ b/ydb/core/ymq/actor/fifo_cleanup.cpp
@@ -6,6 +6,7 @@
#include <ydb/public/lib/value/value.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/ymq/base/debug_info.h>
+#include <ydb/core/ymq/queues/common/key_hashes.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -13,8 +14,14 @@
namespace NKikimr::NSQS {
-TCleanupActor::TCleanupActor(const TQueuePath& queuePath, const TActorId& queueLeader, ECleanupType cleanupType)
+TCleanupActor::TCleanupActor(
+ const TQueuePath& queuePath,
+ ui32 tablesFormat,
+ const TActorId& queueLeader,
+ ECleanupType cleanupType
+)
: QueuePath_(queuePath)
+ , TablesFormat_(tablesFormat)
, RequestId_(CreateGuidAsString())
, QueueLeader_(queueLeader)
, CleanupType(cleanupType)
@@ -77,9 +84,12 @@ void TCleanupActor::RunCleanupQuery() {
.User(QueuePath_.UserName)
.Queue(QueuePath_.QueueName)
.QueueLeader(QueueLeader_)
+ .TablesFormat(TablesFormat_)
.QueryId(GetCleanupQueryId())
.RetryOnTimeout()
.Params()
+ .Uint64("QUEUE_ID_NUMBER", QueuePath_.Version)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueuePath_.Version))
.Uint64("NOW", Now().MilliSeconds())
.Uint64("BATCH_SIZE", Cfg().GetCleanupBatchSize());
diff --git a/ydb/core/ymq/actor/fifo_cleanup.h b/ydb/core/ymq/actor/fifo_cleanup.h
index edf6040eec..4aa9443c89 100644
--- a/ydb/core/ymq/actor/fifo_cleanup.h
+++ b/ydb/core/ymq/actor/fifo_cleanup.h
@@ -14,7 +14,7 @@ public:
Reads,
};
- TCleanupActor(const TQueuePath& queuePath, const TActorId& queueLeader, ECleanupType cleanupType);
+ TCleanupActor(const TQueuePath& queuePath, ui32 tablesFormat, const TActorId& queueLeader, ECleanupType cleanupType);
~TCleanupActor();
void Bootstrap();
@@ -39,6 +39,7 @@ private:
private:
const TQueuePath QueuePath_;
+ const ui32 TablesFormat_;
const TString RequestId_;
const TActorId QueueLeader_;
const ECleanupType CleanupType;
diff --git a/ydb/core/ymq/actor/purge.cpp b/ydb/core/ymq/actor/purge.cpp
index aff713d784..8000b00665 100644
--- a/ydb/core/ymq/actor/purge.cpp
+++ b/ydb/core/ymq/actor/purge.cpp
@@ -8,13 +8,21 @@
#include <ydb/core/ymq/base/counters.h>
#include <ydb/core/ymq/base/debug_info.h>
#include <ydb/core/ymq/base/query_id.h>
+#include <ydb/core/ymq/queues/common/key_hashes.h>
using NKikimr::NClient::TValue;
namespace NKikimr::NSQS {
-TPurgeActor::TPurgeActor(const TQueuePath& queuePath, TIntrusivePtr<TQueueCounters> counters, const TActorId& queueLeader, bool isFifo)
+TPurgeActor::TPurgeActor(
+ const TQueuePath& queuePath,
+ ui32 tablesFormat,
+ TIntrusivePtr<TQueueCounters> counters,
+ const TActorId& queueLeader,
+ bool isFifo
+)
: QueuePath_(queuePath)
+ , TablesFormat_(tablesFormat)
, RequestId_(CreateGuidAsString())
, Counters_(std::move(counters))
, QueueLeader_(queueLeader)
@@ -63,11 +71,16 @@ void TPurgeActor::MakeGetRetentionOffsetRequest(const ui64 shardId, TShard* shar
.Queue(QueuePath_.QueueName)
.Shard(shardId)
.QueueLeader(QueueLeader_)
+ .TablesFormat(TablesFormat_)
.QueryId(GET_RETENTION_OFFSET_ID)
.Counters(Counters_)
.RetryOnTimeout()
.OnExecuted(onExecuted)
.Params()
+ .Uint64("QUEUE_ID_NUMBER", QueuePath_.Version)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueuePath_.Version))
+ .AddWithType("SHARD", shardId, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
+ .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueuePath_.Version, shardId))
.Uint64("OFFSET_FROM", shard->PreviousSuccessfullyProcessedLastMessage.Offset)
.Uint64("TIME_FROM", shard->PreviousSuccessfullyProcessedLastMessage.SentTimestamp.MilliSeconds())
.Uint64("TIME_TO", boundary.MilliSeconds())
@@ -107,15 +120,19 @@ void TPurgeActor::MakeStage1Request(const ui64 shardId, TShard* shard, const std
.Queue(QueuePath_.QueueName)
.Shard(shardId)
.QueueLeader(QueueLeader_)
+ .TablesFormat(TablesFormat_)
.QueryId(PURGE_QUEUE_ID)
.Counters(Counters_)
.RetryOnTimeout()
.OnExecuted(onExecuted)
.Params()
+ .Uint64("QUEUE_ID_NUMBER", QueuePath_.Version)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueuePath_.Version))
+ .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueuePath_.Version, shardId))
.Uint64("OFFSET_FROM", offsets.first)
.Uint64("OFFSET_TO", offsets.second)
.Uint64("NOW", Now().MilliSeconds())
- .Uint64("SHARD", shardId)
+ .AddWithType("SHARD", shardId, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
.Uint64("BATCH_SIZE", Cfg().GetCleanupBatchSize())
.ParentBuilder().Start();
}
@@ -180,15 +197,24 @@ void TPurgeActor::MakeStage2Request(ui64 cleanupVersion, const TValue& messages,
.Queue(QueuePath_.QueueName)
.Shard(shardId)
.QueueLeader(QueueLeader_)
+ .TablesFormat(TablesFormat_)
.QueryId(PURGE_QUEUE_STAGE2_ID)
.Counters(Counters_)
.RetryOnTimeout()
.OnExecuted(onExecuted);
NClient::TWriteValue params = builder.ParamsValue();
+
+ params["QUEUE_ID_NUMBER"] = QueuePath_.Version;
+ params["QUEUE_ID_NUMBER_HASH"] = GetKeysHash(QueuePath_.Version);
+ params["QUEUE_ID_NUMBER_AND_SHARD_HASH"] = GetKeysHash(QueuePath_.Version, shardId);
params["CLEANUP_VERSION"] = cleanupVersion;
- params["SHARD"] = shardId;
params["NOW"] = TActivationContext::Now().MilliSeconds();
+ if (TablesFormat_ == 0) {
+ params["SHARD"] = shardId;
+ } else {
+ params["SHARD"] = static_cast<ui32>(shardId);
+ }
auto messagesParam = params["MESSAGES"];
FillMessagesParam(messagesParam, messages, shard->CurrentLastMessage.Offset, shard->CurrentLastMessage.SentTimestamp);
diff --git a/ydb/core/ymq/actor/purge.h b/ydb/core/ymq/actor/purge.h
index 2a16a5aa24..ac9d191d04 100644
--- a/ydb/core/ymq/actor/purge.h
+++ b/ydb/core/ymq/actor/purge.h
@@ -27,7 +27,13 @@ class TPurgeActor : public TActorBootstrapped<TPurgeActor> {
};
public:
- TPurgeActor(const TQueuePath& queuePath, TIntrusivePtr<TQueueCounters> counters, const TActorId& queueLeader, bool isFifo);
+ TPurgeActor(
+ const TQueuePath& queuePath,
+ ui32 tablesFormat,
+ TIntrusivePtr<TQueueCounters> counters,
+ const TActorId& queueLeader,
+ bool isFifo
+ );
~TPurgeActor();
void Bootstrap();
@@ -56,6 +62,7 @@ private:
private:
const TQueuePath QueuePath_;
+ const ui32 TablesFormat_;
/// A state of shard processing
TMap<ui64, TShard> Shards_;
const TString RequestId_;
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index 250731b0df..d9f79471f7 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -1135,6 +1135,10 @@ void TQueueLeader::ProcessChangeMessageVisibilityBatch(TChangeMessageVisibilityB
.OnExecuted([this, requestId = req.RequestId, shard = req.Shard](const TSqsEvents::TEvExecuted::TRecord& ev) { OnVisibilityChanged(requestId, shard, ev); });
builder.Params()
+ .Uint64("QUEUE_ID_NUMBER", QueueVersion_)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_))
+ .AddWithType("SHARD", req.Shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
+ .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, req.Shard))
.Uint64("NOW", req.NowTimestamp.MilliSeconds())
.Uint64("GROUPS_READ_ATTEMPT_IDS_PERIOD", Cfg().GetGroupsReadAttemptIdsPeriodMs());
NClient::TWriteValue params = builder.ParamsValue();
@@ -1515,6 +1519,7 @@ void TQueueLeader::RequestMessagesCountMetrics(ui64 shard) {
.Params()
.Uint64("QUEUE_ID_NUMBER", QueueVersion_)
.Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_))
+ .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard))
.AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
.ParentBuilder().Start();
++MetricsQueriesInfly_;
@@ -1540,8 +1545,10 @@ void TQueueLeader::RequestOldestTimestampMetrics(ui64 shard) {
.OnExecuted([this, shard](const TSqsEvents::TEvExecuted::TRecord& ev) { ReceiveOldestTimestampMetrics(shard, ev); })
.Counters(Counters_)
.Params()
- .Uint64("QUEUE_ID_QUEUE_ID_NUMBER", QueueVersion_)
+ .Uint64("QUEUE_ID_NUMBER", QueueVersion_)
.Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_))
+ .AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
+ .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard))
.Uint64("TIME_FROM", Shards_[shard].LastSuccessfulOldestMessageTimestampValueMs) // optimization for accurate range selection // timestamp is always nondecreasing
.ParentBuilder().Start();
++MetricsQueriesInfly_;
@@ -1665,21 +1672,27 @@ void TQueueLeader::CreateBackgroundActors() {
}
if (IsFifoQueue_) {
+ auto createCleaner = [&](TCleanupActor::ECleanupType type) {
+ auto actor = Register(new TCleanupActor(GetQueuePath(), TablesFormat_, SelfId(), type));
+ LOG_SQS_DEBUG(
+ "Created new " << type << " cleanup actor for queue " << TLogQueueName(UserName_, QueueName_)
+ << ". Actor id: " << actor
+ );
+ return actor;
+ };
if (!DeduplicationCleanupActor_) {
- DeduplicationCleanupActor_ = Register(new TCleanupActor(GetQueuePath(), SelfId(), TCleanupActor::ECleanupType::Deduplication));
- LOG_SQS_DEBUG("Created new deduplication cleanup actor for queue " << TLogQueueName(UserName_, QueueName_) << ". Actor id: " << DeduplicationCleanupActor_);
+ DeduplicationCleanupActor_ = createCleaner(TCleanupActor::ECleanupType::Deduplication);
}
if (!ReadsCleanupActor_) {
- ReadsCleanupActor_ = Register(new TCleanupActor(GetQueuePath(), SelfId(), TCleanupActor::ECleanupType::Reads));
- LOG_SQS_DEBUG("Created new reads cleanup actor for queue " << TLogQueueName(UserName_, QueueName_) << ". Actor id: " << ReadsCleanupActor_);
+ ReadsCleanupActor_ = createCleaner(TCleanupActor::ECleanupType::Reads);
}
}
if (!RetentionActor_) {
- RetentionActor_ = Register(new TRetentionActor(GetQueuePath(), SelfId()));
+ RetentionActor_ = Register(new TRetentionActor(GetQueuePath(), TablesFormat_, SelfId()));
LOG_SQS_DEBUG("Created new retention actor for queue " << TLogQueueName(UserName_, QueueName_) << ". Actor id: " << RetentionActor_);
}
if (!PurgeActor_) {
- PurgeActor_ = Register(new TPurgeActor(GetQueuePath(), Counters_, SelfId(), IsFifoQueue_));
+ PurgeActor_ = Register(new TPurgeActor(GetQueuePath(), TablesFormat_, Counters_, SelfId(), IsFifoQueue_));
LOG_SQS_DEBUG("Created new purge actor for queue " << TLogQueueName(UserName_, QueueName_) << ". Actor id: " << PurgeActor_);
}
}
diff --git a/ydb/core/ymq/actor/retention.cpp b/ydb/core/ymq/actor/retention.cpp
index 9e610e8ed2..d5a0995559 100644
--- a/ydb/core/ymq/actor/retention.cpp
+++ b/ydb/core/ymq/actor/retention.cpp
@@ -6,14 +6,16 @@
#include <ydb/public/lib/value/value.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/ymq/base/debug_info.h>
+#include <ydb/core/ymq/queues/common/key_hashes.h>
#include <library/cpp/actors/core/hfunc.h>
#include <util/random/random.h>
namespace NKikimr::NSQS {
-TRetentionActor::TRetentionActor(const TQueuePath& queuePath, const TActorId& queueLeader)
+TRetentionActor::TRetentionActor(const TQueuePath& queuePath, ui32 tablesFormat, const TActorId& queueLeader)
: QueuePath_(queuePath)
+ , TablesFormat_(tablesFormat)
, RequestId_(CreateGuidAsString())
, QueueLeader_(queueLeader)
{
@@ -49,8 +51,11 @@ void TRetentionActor::SetRetentionBoundary() {
auto req = MakeHolder<TSqsEvents::TEvPurgeQueue>();
req->QueuePath = QueuePath_;
req->Boundary = TInstant::MilliSeconds(ui64(list[i]["RetentionBoundary"]));
- req->Shard = ui64(list[i]["Shard"]);
-
+ if (TablesFormat_ == 0) {
+ req->Shard = list[i]["Shard"];
+ } else {
+ req->Shard = static_cast<ui32>(list[i]["Shard"]);
+ }
RLOG_SQS_INFO("Set retention boundary for queue " << TLogQueueName(QueuePath_, req->Shard) << " to " << req->Boundary.MilliSeconds() << " (" << req->Boundary << ")");
Send(QueueLeader_, std::move(req));
@@ -66,10 +71,13 @@ void TRetentionActor::SetRetentionBoundary() {
.User(QueuePath_.UserName)
.Queue(QueuePath_.QueueName)
.QueueLeader(QueueLeader_)
+ .TablesFormat(TablesFormat_)
.QueryId(SET_RETENTION_ID)
.RetryOnTimeout()
.OnExecuted(onExecuted)
.Params()
+ .Uint64("QUEUE_ID_NUMBER", QueuePath_.Version)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueuePath_.Version))
.Uint64("NOW", Now().MilliSeconds())
.Bool("PURGE", false)
.ParentBuilder().Start();
diff --git a/ydb/core/ymq/actor/retention.h b/ydb/core/ymq/actor/retention.h
index 724baddff3..e2413fafdc 100644
--- a/ydb/core/ymq/actor/retention.h
+++ b/ydb/core/ymq/actor/retention.h
@@ -9,7 +9,7 @@ namespace NKikimr::NSQS {
class TRetentionActor : public TActorBootstrapped<TRetentionActor> {
public:
- TRetentionActor(const TQueuePath& queuePath, const TActorId& queueLeader);
+ TRetentionActor(const TQueuePath& queuePath, ui32 tablesFormat, const TActorId& queueLeader);
~TRetentionActor();
void Bootstrap();
@@ -32,6 +32,7 @@ private:
private:
const TQueuePath QueuePath_;
+ const ui32 TablesFormat_;
const TString RequestId_;
const TActorId QueueLeader_;
};
diff --git a/ydb/core/ymq/actor/set_queue_attributes.cpp b/ydb/core/ymq/actor/set_queue_attributes.cpp
index df33ad99e6..492a2021a1 100644
--- a/ydb/core/ymq/actor/set_queue_attributes.cpp
+++ b/ydb/core/ymq/actor/set_queue_attributes.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/ymq/base/limits.h>
#include <ydb/core/ymq/base/dlq_helpers.h>
#include <ydb/core/ymq/base/queue_attributes.h>
+#include <ydb/core/ymq/queues/common/key_hashes.h>
#include <ydb/public/lib/value/value.h>
#include <library/cpp/scheme/scheme.h>
@@ -71,10 +72,14 @@ private:
.User(UserName_)
.Queue(GetQueueName())
.QueueLeader(QueueLeader_)
+ .TablesFormat(TablesFormat())
.QueryId(SET_QUEUE_ATTRIBUTES_ID)
.Counters(QueueCounters_)
.RetryOnTimeout();
+ builder.Params().Uint64("QUEUE_ID_NUMBER", QueueVersion_.GetRef());
+ builder.Params().Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_.GetRef()));
+
builder.Params().OptionalUint64("MAX_RECEIVE_COUNT", ValidatedAttributes_.RedrivePolicy.MaxReceiveCount);
builder.Params().OptionalUtf8("DLQ_TARGET_ARN", ValidatedAttributes_.RedrivePolicy.TargetArn);
builder.Params().OptionalUtf8("DLQ_TARGET_NAME", ValidatedAttributes_.RedrivePolicy.TargetQueueName);
diff --git a/ydb/core/ymq/base/queue_path.h b/ydb/core/ymq/base/queue_path.h
index baaa781b00..c8154c245b 100644
--- a/ydb/core/ymq/base/queue_path.h
+++ b/ydb/core/ymq/base/queue_path.h
@@ -8,6 +8,7 @@ struct TQueuePath {
TString Root;
TString UserName;
TString QueueName;
+ ui64 Version;
TString VersionSuffix;
TQueuePath()
@@ -20,6 +21,7 @@ struct TQueuePath {
: Root(root)
, UserName(userName)
, QueueName(queueName)
+ , Version(version)
{
if (version) {
VersionSuffix = TString::Join("v", ToString(version));
diff --git a/ydb/core/ymq/queues/common/db_queries_defs.h b/ydb/core/ymq/queues/common/db_queries_defs.h
index 4ec1ebb8a0..8179ef96ec 100644
--- a/ydb/core/ymq/queues/common/db_queries_defs.h
+++ b/ydb/core/ymq/queues/common/db_queries_defs.h
@@ -23,3 +23,6 @@
#define DLQ_ID_AND_SHARD_KEYS_PARAM "%17$s"
#define DLQ_SHARD_TYPE_PARAM "%18$s"
#define DLQ_STATE_KEYS_PARAM "%19$s"
+
+#define SELECT_QUEUE_AND_SHARD_HASH_PARAM "%20$s"
+#define LOAD_QUEUE_AND_SHARD_HASH_OR_ZERO_PARAM "%21$s"
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.cpp b/ydb/core/ymq/queues/common/db_queries_maker.cpp
index 47c16653ff..83915a8039 100644
--- a/ydb/core/ymq/queues/common/db_queries_maker.cpp
+++ b/ydb/core/ymq/queues/common/db_queries_maker.cpp
@@ -5,6 +5,14 @@
namespace NKikimr::NSQS {
+ const char* TDbQueriesMaker::GetSelectQueueAndShardHash() const {
+ return TablesFormat_ == 1 ? "'QueueIdNumberAndShardHash" : "";
+ }
+
+ const char* TDbQueriesMaker::GetLoadQueueAndShardHashOrZero() const {
+ return TablesFormat_ == 1 ? "Member item 'QueueIdNumberAndShardHash" : "Uint64 '0";
+ }
+
TString TDbQueriesMaker::FillQuery(const char* query) const {
return Sprintf(
query,
@@ -29,7 +37,10 @@ namespace NKikimr::NSQS {
GetDlqIdKeys(), // 16
GetDlqIdAndShardKeys(), // 17
GetShardColumnType(DlqTablesFormat_), // 18
- GetDlqStateKeys() // 19
+ GetDlqStateKeys(), // 19
+
+ GetSelectQueueAndShardHash(), // 20
+ GetLoadQueueAndShardHashOrZero() // 21
);
}
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.h b/ydb/core/ymq/queues/common/db_queries_maker.h
index e2da48d51d..e4514a0160 100644
--- a/ydb/core/ymq/queues/common/db_queries_maker.h
+++ b/ydb/core/ymq/queues/common/db_queries_maker.h
@@ -97,7 +97,17 @@ private:
const char* GetAllShardsRange() const {
if (TablesFormat_ == 1) {
- return "'('QueueIdNumber queueIdNumber queueIdNumber)";
+ if (IsFifo_) {
+ return R"__(
+ '('QueueIdNumberHash (Uint64 '0) (Uint64 '18446744073709551615))
+ '('QueueIdNumber queueIdNumber queueIdNumber)
+ )__";
+ }
+ return R"__(
+ '('QueueIdNumberAndShardHash (Uint64 '0) (Uint64 '18446744073709551615))
+ '('QueueIdNumber queueIdNumber queueIdNumber)
+ '('Shard (Uint32 '0) (Uint32 '4294967295))
+ )__";
}
return "'('State (Uint64 '0) (Uint64 '18446744073709551615))";
}
@@ -129,6 +139,9 @@ private:
return tablesFormat == 1 ? "Uint32" : "Uint64";
}
+ const char* GetSelectQueueAndShardHash() const;
+ const char* GetLoadQueueAndShardHashOrZero() const;
+
void FillQueueVars(
const TString& userName,
const TString& queueName,
diff --git a/ydb/core/ymq/queues/fifo/queries.cpp b/ydb/core/ymq/queues/fifo/queries.cpp
index 4dd8494c76..0d5c5239e2 100644
--- a/ydb/core/ymq/queues/fifo/queries.cpp
+++ b/ydb/core/ymq/queues/fifo/queries.cpp
@@ -1253,7 +1253,10 @@ static const char* const DeduplicationCleanupQuery = R"__(
(Map dedupToErase (lambda '(item) (block '(
(return (EraseRow dedupTable '(
- '('DedupId (Member item 'DedupId)))))))))
+ )__" QUEUE_ID_KEYS_PARAM R"__(
+ '('DedupId (Member item 'DedupId)))
+ ))
+ ))))
))
)
)__";
@@ -1290,7 +1293,10 @@ static const char* const ReadsCleanupQuery = R"__(
(Map readsToErase (lambda '(item) (block '(
(return (EraseRow readsTable '(
- '('ReceiveAttemptId (Member item 'ReceiveAttemptId)))))))))
+ )__" QUEUE_ID_KEYS_PARAM R"__(
+ '('ReceiveAttemptId (Member item 'ReceiveAttemptId))
+ )))
+ ))))
))
)
)__";
@@ -1331,7 +1337,7 @@ const char* const SetRetentionQuery = R"__(
(Bool 'false)))
(return (AsStruct
- '('Shard (Uint64 '0))
+ '('Shard ()__" SHARD_TYPE_PARAM R"__( '0))
'('RetentionBoundary (Max boundary (Member item 'RetentionBoundary)))
'('Updated updated))))))))
diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp
index 8b0cc55b39..23ee4ab5d6 100644
--- a/ydb/core/ymq/queues/std/queries.cpp
+++ b/ydb/core/ymq/queues/std/queries.cpp
@@ -201,9 +201,13 @@ const char* const PurgeQueueQuery = R"__(
(let modifiedTimestamp (Max now (Member stateRead 'LastModifiedTimestamp)))
(let messageRange '(
- '('Offset offsetFrom offsetTo)))
+ )__" QUEUE_ID_AND_SHARD_KEYS_RANGE_PARAM R"__(
+ '('Offset offsetFrom offsetTo)
+ ))
(let inflyRange '(
- '('Offset offsetFrom offsetTo)))
+ )__" QUEUE_ID_AND_SHARD_KEYS_RANGE_PARAM R"__(
+ '('Offset offsetFrom offsetTo)
+ ))
(let messageSelect '(
'SentTimestamp
'Offset
@@ -239,7 +243,6 @@ const char* const PurgeQueueStage2Query = R"__(
(let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
(let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
- (let shard (Parameter 'SHARD (DataType 'Uint64)))
(let cleanupVersion (Parameter 'CLEANUP_VERSION (DataType 'Uint64)))
(let now (Parameter 'NOW (DataType 'Uint64)))
(let messages (Parameter 'MESSAGES
@@ -271,6 +274,7 @@ const char* const PurgeQueueStage2Query = R"__(
(let inflyRecords
(MapParameter messages (lambda '(item) (block '(
(let row '(
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('Offset (Member item 'Offset))))
(let fields '(
'Offset
@@ -285,6 +289,7 @@ const char* const PurgeQueueStage2Query = R"__(
(let messageRecords
(MapParameter messages (lambda '(item) (block '(
(let row '(
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('Offset (Member item 'Offset))))
(let fields '(
'Offset
@@ -332,27 +337,39 @@ const char* const PurgeQueueStage2Query = R"__(
(If versionIsSame
(Map inflyRecordsExisted (lambda '(item) (block '(
(return (EraseRow inflyTable '(
- '('Offset (Member item 'Offset)))))))))
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
+ '('Offset (Member item 'Offset))
+ )))
+ ))))
(AsList (Void)))
(If versionIsSame
(Map messages (lambda '(item) (block '(
(return (EraseRow dataTable '(
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('RandomId (Member item 'RandomId))
- '('Offset (Member item 'Offset)))))))))
+ '('Offset (Member item 'Offset))
+ )))
+ ))))
(AsList (Void)))
(If versionIsSame
(Map messageRecordsExisted (lambda '(item) (block '(
(return (EraseRow msgTable '(
- '('Offset (Member item 'Offset)))))))))
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
+ '('Offset (Member item 'Offset))
+ )))
+ ))))
(AsList (Void)))
(If versionIsSame
(Map messages (lambda '(item) (block '(
(return (EraseRow sentTsIdx '(
+ )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('SentTimestamp (Member item 'SentTimestamp))
- '('Offset (Member item 'Offset)))))))))
+ '('Offset (Member item 'Offset))
+ )))
+ ))))
(AsList (Void)))
))
)
@@ -999,11 +1016,10 @@ const char* const WriteMessageQuery = R"__(
)
)__";
-// range more slow???
-// check column renames test
const char* const SetRetentionQuery = R"__(
(
- (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
(let now (Parameter 'NOW (DataType 'Uint64)))
(let purge (Parameter 'PURGE (DataType 'Bool)))
@@ -1025,7 +1041,9 @@ const char* const SetRetentionQuery = R"__(
))
(let fields '(
')__" SHARD_COLUMN_NAME_PARAM R"__(
- 'RetentionBoundary))
+ )__" SELECT_QUEUE_AND_SHARD_HASH_PARAM R"__(
+ 'RetentionBoundary
+ ))
(let records (Member (SelectRange stateTable range fields '()) 'List))
(let result
@@ -1039,6 +1057,9 @@ const char* const SetRetentionQuery = R"__(
'('Shard (
Member item ')__" SHARD_COLUMN_NAME_PARAM R"__(
))
+ '('QueueIdNumberAndShardHash (
+ )__" LOAD_QUEUE_AND_SHARD_HASH_OR_ZERO_PARAM R"__(
+ ))
'('RetentionBoundary (Max boundary (Member item 'RetentionBoundary)))
'('Updated updated))))))))
@@ -1051,6 +1072,7 @@ const char* const SetRetentionQuery = R"__(
(Map updated (lambda '(item) (block '(
(let shard (Member item 'Shard))
+ (let queueIdNumberAndShardHash (Member item 'QueueIdNumberAndShardHash))
(let row '(
)__" STATE_KEYS_PARAM R"__(
))
diff --git a/ydb/tests/functional/sqs/sqs_test_base.py b/ydb/tests/functional/sqs/sqs_test_base.py
index 05d5303f64..de6623b887 100644
--- a/ydb/tests/functional/sqs/sqs_test_base.py
+++ b/ydb/tests/functional/sqs/sqs_test_base.py
@@ -35,6 +35,12 @@ IS_FIFO_PARAMS = {
'ids': ['fifo', 'std'],
}
+TABLES_FORMAT_PARAMS = {
+ 'argnames': 'tables_format',
+ 'argvalues': [0, 1],
+ 'ids': ['tables_format_v0', 'tables_format_v1'],
+}
+
POLLING_PARAMS = {
'argnames': 'polling_wait_timeout',
'argvalues': [0, 1],
@@ -144,6 +150,7 @@ class KikimrSqsTestBase(object):
database = '/Root'
sqs_root = '/Root/SQS'
use_in_memory_pdisks = True
+ tables_format_per_user = {}
@classmethod
def setup_class(cls):
@@ -627,14 +634,18 @@ class KikimrSqsTestBase(object):
time.sleep(1) # wait node to start
def _smart_make_table_path(self, user_name, queue_name, queue_version, shard, table_name):
- table_path = '{}/{}'.format(self.sqs_root, user_name)
- if queue_name is not None:
- table_path += '/{}'.format(queue_name)
- if queue_version is not None and queue_version != 0:
- table_path += '/v{}'.format(queue_version)
- if shard is not None:
- table_path += '/{}'.format(shard)
-
+ tables_format = self.tables_format_per_user.get(user_name, 0)
+ table_path = self.sqs_root
+ if tables_format == 0:
+ table_path += '/{}'.format(user_name)
+ if queue_name is not None:
+ table_path += '/{}'.format(queue_name)
+ if queue_version is not None and queue_version != 0:
+ table_path += '/v{}'.format(queue_version)
+ if shard is not None:
+ table_path += '/{}'.format(shard)
+ else:
+ table_path += '/{}'.format('.FIFO' if queue_name.endswith('.fifo') else '.STD')
return table_path + '/{}'.format(table_name)
def _get_queue_version_number(self, user_name, queue_name):
@@ -751,3 +762,18 @@ class KikimrSqsTestBase(object):
for shard in range(shards):
session.drop_table(self._smart_make_table_path(username, queuename, version, shard, 'Messages'))
session.drop_table(self._smart_make_table_path(username, queuename, version, shard, 'MessageData'))
+
+ def _set_tables_format(self, username=None, tables_format=1):
+ if username is None:
+ username = self._username
+ self._execute_yql_query(
+ f'UPSERT INTO `{self.sqs_root}/.Settings` (Account, Name, Value) \
+ VALUES ("{username}", "CreateQueuesWithTabletFormat", "{tables_format}")'
+ )
+ self.tables_format_per_user[username] = tables_format
+
+ def _init_with_params(self, is_fifo=None, tables_format=None):
+ if is_fifo and not self.queue_name.endswith('.fifo'):
+ self.queue_name += '.fifo'
+ if tables_format is not None:
+ self._set_tables_format(tables_format=tables_format)
diff --git a/ydb/tests/functional/sqs/test_garbage_collection.py b/ydb/tests/functional/sqs/test_garbage_collection.py
index 992c8a6513..aff51e3bcd 100644
--- a/ydb/tests/functional/sqs/test_garbage_collection.py
+++ b/ydb/tests/functional/sqs/test_garbage_collection.py
@@ -13,7 +13,7 @@ from sqs_requests_client import SqsHttpApi
from sqs_matchers import ReadResponseMatcher
from sqs_test_base import to_bytes
-from sqs_test_base import KikimrSqsTestBase, VISIBILITY_CHANGE_METHOD_PARAMS, IS_FIFO_PARAMS
+from sqs_test_base import KikimrSqsTestBase, VISIBILITY_CHANGE_METHOD_PARAMS, IS_FIFO_PARAMS, TABLES_FORMAT_PARAMS
def send_message(server, username, queue_url, sqs_port, body, seq_no, group_id):
@@ -113,9 +113,10 @@ class TestSqsGarbageCollection(KikimrSqsTestBase):
break
@pytest.mark.parametrize(**IS_FIFO_PARAMS)
- def test_removes_messages_by_retention_time(self, is_fifo):
- if is_fifo:
- self.queue_name = self.queue_name + '.fifo'
+ @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
+ def test_removes_messages_by_retention_time(self, is_fifo, tables_format):
+ self._init_with_params(is_fifo, tables_format)
+
self._create_queue_and_assert(self.queue_name, is_fifo=is_fifo)
number_of_mesages_to_write = 110 if is_fifo else 220
@@ -166,8 +167,9 @@ class TestSqsGarbageCollection(KikimrSqsTestBase):
assert_that(number_of_messages, equal_to(0))
self._check_queue_tables_are_empty()
- def test_cleanups_deduplication_table(self):
- self.queue_name = self.queue_name + '.fifo'
+ @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
+ def test_cleanups_deduplication_table(self, tables_format):
+ self._init_with_params(is_fifo=True, tables_format=tables_format)
self._create_queue_and_assert(self.queue_name, is_fifo=True)
# do the same again to ensure that this process will not stop
@@ -177,8 +179,9 @@ class TestSqsGarbageCollection(KikimrSqsTestBase):
self.wait_fifo_table_empty('Deduplication')
@pytest.mark.parametrize('random_groups_max_count', [30, 200])
- def test_cleanups_reads_table(self, random_groups_max_count):
- self.queue_name = self.queue_name + '.fifo'
+ @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
+ def test_cleanups_reads_table(self, random_groups_max_count, tables_format):
+ self._init_with_params(is_fifo=True, tables_format=tables_format)
self._create_queue_and_assert(self.queue_name, is_fifo=True)
# do the same again to ensure that this process will not stop
@@ -194,8 +197,9 @@ class TestSqsGarbageCollection(KikimrSqsTestBase):
self.delete_messages(self.queue_url, [r['ReceiptHandle'] for r in read_result])
@pytest.mark.parametrize(**VISIBILITY_CHANGE_METHOD_PARAMS)
- def test_visibility_change_cleanups_proper_receive_attempt_id(self, delete_message):
- self.queue_name = self.queue_name + '.fifo'
+ @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
+ def test_visibility_change_cleanups_proper_receive_attempt_id(self, delete_message, tables_format):
+ self._init_with_params(is_fifo=True, tables_format=tables_format)
queue_url = self._create_queue_and_assert(self.queue_name, is_fifo=True)
groups_count = 5
for group_number in range(groups_count):
diff --git a/ydb/tests/functional/sqs/test_multiplexing_tables_format.py b/ydb/tests/functional/sqs/test_multiplexing_tables_format.py
index bfc8734958..e98e0c8dd0 100644
--- a/ydb/tests/functional/sqs/test_multiplexing_tables_format.py
+++ b/ydb/tests/functional/sqs/test_multiplexing_tables_format.py
@@ -10,14 +10,6 @@ from sqs_matchers import ReadResponseMatcher
class MultiplexingTablesFormatTest(KikimrSqsTestBase):
- def _set_new_format_settings(self, username=None, tables_format=1):
- if username is None:
- username = self._username
- self._execute_yql_query(
- f'UPSERT INTO `{self.sqs_root}/.Settings` (Account, Name, Value) \
- VALUES ("{username}", "CreateQueuesWithTabletFormat", "{tables_format}")'
- )
-
def create_queue(self, is_fifo):
if is_fifo and not self.queue_name.endswith('.fifo'):
self.queue_name = self.queue_name + '.fifo'
@@ -31,13 +23,13 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase):
assert(False)
def create_queue_with_wrong_tables_format(self, tables_format):
- self._set_new_format_settings(tables_format='qwerty')
+ self._set_tables_format(tables_format='qwerty')
self.create_queue_must_fail(True)
self.create_queue_must_fail(False)
@pytest.mark.parametrize(**IS_FIFO_PARAMS)
def test_create_queue(self, is_fifo):
- self._set_new_format_settings()
+ self._set_tables_format()
self.create_queue(is_fifo)
def test_create_queue_with_incorrect_tables_format(self):
@@ -51,7 +43,7 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase):
@pytest.mark.parametrize(**IS_FIFO_PARAMS)
def test_send_message(self, is_fifo):
- self._set_new_format_settings()
+ self._set_tables_format()
created_queue_url = self.create_queue(is_fifo)
self._send_message_and_assert(
created_queue_url,
@@ -62,7 +54,7 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase):
@pytest.mark.parametrize(**IS_FIFO_PARAMS)
def test_read_message(self, is_fifo):
- self._set_new_format_settings()
+ self._set_tables_format()
created_queue_url = self.create_queue(is_fifo)
self.seq_no += 1
message_id = self._send_message_and_assert(
@@ -79,7 +71,7 @@ class MultiplexingTablesFormatTest(KikimrSqsTestBase):
)
def do_test_double_create(self, is_fifo, tables_format):
- self._set_new_format_settings(tables_format=tables_format)
+ self._set_tables_format(tables_format=tables_format)
self.create_queue(is_fifo)
self.create_queue(is_fifo)