aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-06-14 14:30:56 +0300
committerAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-06-14 14:30:56 +0300
commitc38f3409183af2ce08203f9b6d30714cc99fa8a5 (patch)
treed8226264386a40055ee317fe9812ef127fc18449
parent16f8be4f481c275c34795233c18f8d078382fcb3 (diff)
downloadydb-c38f3409183af2ce08203f9b6d30714cc99fa8a5.tar.gz
[ymq] change keys in common state table (remove full scan query)
init ref:ba679eabbfb1f61c69f391954a71309d7e5fa61b
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp6
-rw-r--r--ydb/core/ymq/actor/queue_schema.cpp9
-rw-r--r--ydb/core/ymq/queues/common/db_queries_defs.h3
-rw-r--r--ydb/core/ymq/queues/common/db_queries_maker.cpp47
-rw-r--r--ydb/core/ymq/queues/common/db_queries_maker.h35
-rw-r--r--ydb/core/ymq/queues/std/queries.cpp21
-rw-r--r--ydb/tests/library/sqs/tables.py8
7 files changed, 67 insertions, 62 deletions
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index d9f79471f7..c36bb5a432 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -1519,7 +1519,6 @@ void TQueueLeader::RequestMessagesCountMetrics(ui64 shard) {
.Params()
.Uint64("QUEUE_ID_NUMBER", QueueVersion_)
.Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_))
- .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard))
.AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
.ParentBuilder().Start();
++MetricsQueriesInfly_;
@@ -1751,6 +1750,7 @@ void TQueueLeader::StartLoadingInfly(ui64 shard, bool afterFailure) {
.Counters(Counters_)
.Params()
.Uint64("QUEUE_ID_NUMBER", QueueVersion_)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_))
.AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
.Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard))
.ParentBuilder().Start();
@@ -1769,8 +1769,8 @@ void TQueueLeader::StartLoadingInfly(ui64 shard, bool afterFailure) {
.Counters(Counters_)
.Params()
.Uint64("QUEUE_ID_NUMBER", QueueVersion_)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_))
.AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
- .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard))
.ParentBuilder().Start();
}
@@ -2599,6 +2599,7 @@ void TQueueLeader::TLoadBatch::Execute(TQueueLeader* leader) {
.RetryOnTimeout()
.Params()
.Uint64("QUEUE_ID_NUMBER", leader->QueueVersion_)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(leader->QueueVersion_))
.Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(leader->QueueVersion_, Shard))
.Uint64("NOW", now.MilliSeconds())
.Uint64("READ_ID", RandomNumber<ui64>())
@@ -2655,6 +2656,7 @@ void TQueueLeader::TLoadBatch::Execute(TQueueLeader* leader) {
builder.Params()
.Uint64("DLQ_ID_NUMBER", dlqInfo.QueueVersion)
+ .Uint64("DLQ_ID_NUMBER_HASH", GetKeysHash(dlqInfo.QueueVersion))
.AddWithType("DLQ_SHARD", dlqShard, dlqInfo.TablesFormat == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64)
.Uint64("DLQ_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(dlqInfo.QueueVersion, dlqShard))
.Uint64("DEAD_LETTERS_COUNT", deadLettersCounter);
diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp
index 5de9ba2058..a477a4d6be 100644
--- a/ydb/core/ymq/actor/queue_schema.cpp
+++ b/ydb/core/ymq/actor/queue_schema.cpp
@@ -906,11 +906,10 @@ static const char* const CommitQueueParamsQuery = R"__(
(ListIf willCommit (UpdateRow eventsTable eventsRow eventsUpdate))
(ListIf willCommit (UpdateRow attrsTable attrRow attrUpdate))
- (If (Not willCommit) (AsList (Void))
- (Map (Enumerate queueIdNumberAndShardHashes) (lambda '(item) (block '(
- (let shardOriginal (Nth item '0))
+
+ (If (Not willCommit) (AsList (Void))
+ (Map (ListFromRange (Uint64 '0) shards) (lambda '(shardOriginal) (block '(
(let shard (Cast shardOriginal 'Uint32))
- (let queueIdNumberAndShardHash (Nth item '1))
(let row '(%5$s))
(let update '(
@@ -937,7 +936,7 @@ TString GetStateTableKeys(ui32 tablesFormat, bool isFifo) {
)__";
}
return R"__(
- '('QueueIdNumberAndShardHash queueIdNumberAndShardHash)
+ '('QueueIdNumberHash queueIdNumberHash)
'('QueueIdNumber version)
'('Shard shard)
)__";
diff --git a/ydb/core/ymq/queues/common/db_queries_defs.h b/ydb/core/ymq/queues/common/db_queries_defs.h
index 8179ef96ec..4ec1ebb8a0 100644
--- a/ydb/core/ymq/queues/common/db_queries_defs.h
+++ b/ydb/core/ymq/queues/common/db_queries_defs.h
@@ -23,6 +23,3 @@
#define DLQ_ID_AND_SHARD_KEYS_PARAM "%17$s"
#define DLQ_SHARD_TYPE_PARAM "%18$s"
#define DLQ_STATE_KEYS_PARAM "%19$s"
-
-#define SELECT_QUEUE_AND_SHARD_HASH_PARAM "%20$s"
-#define LOAD_QUEUE_AND_SHARD_HASH_OR_ZERO_PARAM "%21$s"
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.cpp b/ydb/core/ymq/queues/common/db_queries_maker.cpp
index 83915a8039..abaedf5039 100644
--- a/ydb/core/ymq/queues/common/db_queries_maker.cpp
+++ b/ydb/core/ymq/queues/common/db_queries_maker.cpp
@@ -2,15 +2,49 @@
#include "queries.h"
+namespace {
+ const char* const STD_STATE_KEYS = R"__(
+ '('QueueIdNumberHash queueIdNumberHash)
+ '('QueueIdNumber queueIdNumber)
+ '('Shard shard)
+ )__";
+
+ const char* const DLQ_STD_STATE_KEYS = R"__(
+ '('QueueIdNumberHash dlqIdNumberHash)
+ '('QueueIdNumber dlqIdNumber)
+ '('Shard dlqShard)
+ )__";
+}
+
+
namespace NKikimr::NSQS {
+ const char* TDbQueriesMaker::GetStateKeys() const {
+ if (TablesFormat_ == 0) {
+ return IsFifo_ ? "'('State (Uint64 '0))" : "'('State shard)";
+ }
+ return IsFifo_ ? GetIdKeys() : STD_STATE_KEYS;
+ }
- const char* TDbQueriesMaker::GetSelectQueueAndShardHash() const {
- return TablesFormat_ == 1 ? "'QueueIdNumberAndShardHash" : "";
+ const char* TDbQueriesMaker::GetDlqStateKeys() const {
+ if (TablesFormat_ == 0) {
+ return IsFifo_ ? "'('State (Uint64 '0))" : "'('State dlqShard)";
+ }
+ return IsFifo_ ? GetDlqIdKeys() : DLQ_STD_STATE_KEYS;
}
- const char* TDbQueriesMaker::GetLoadQueueAndShardHashOrZero() const {
- return TablesFormat_ == 1 ? "Member item 'QueueIdNumberAndShardHash" : "Uint64 '0";
+ const char* TDbQueriesMaker::GetAllShardsRange() const {
+ if (TablesFormat_ == 1) {
+ if (IsFifo_) {
+ return QUEUE_ID_KEYS_RANGE;
+ }
+ return R"__(
+ '('QueueIdNumberHash queueIdNumberHash queueIdNumberHash)
+ '('QueueIdNumber queueIdNumber queueIdNumber)
+ '('Shard (Uint32 '0) (Uint32 '4294967295))
+ )__";
+ }
+ return "'('State (Uint64 '0) (Uint64 '18446744073709551615))";
}
TString TDbQueriesMaker::FillQuery(const char* query) const {
@@ -37,10 +71,7 @@ namespace NKikimr::NSQS {
GetDlqIdKeys(), // 16
GetDlqIdAndShardKeys(), // 17
GetShardColumnType(DlqTablesFormat_), // 18
- GetDlqStateKeys(), // 19
-
- GetSelectQueueAndShardHash(), // 20
- GetLoadQueueAndShardHashOrZero() // 21
+ GetDlqStateKeys() // 19
);
}
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.h b/ydb/core/ymq/queues/common/db_queries_maker.h
index e4514a0160..8597ad058d 100644
--- a/ydb/core/ymq/queues/common/db_queries_maker.h
+++ b/ydb/core/ymq/queues/common/db_queries_maker.h
@@ -81,36 +81,10 @@ public:
TString GetMatchQueueAttributesQuery() const;
private:
- const char* GetStateKeys() const {
- if (TablesFormat_ == 0) {
- return IsFifo_ ? "'('State (Uint64 '0))" : "'('State shard)";
- }
- return IsFifo_ ? GetIdKeys() : GetIdAndShardKeys();
- }
-
- const char* GetDlqStateKeys() const {
- if (TablesFormat_ == 0) {
- return IsFifo_ ? "'('State (Uint64 '0))" : "'('State dlqShard)";
- }
- return IsFifo_ ? GetDlqIdKeys() : GetDlqIdAndShardKeys();
- }
+ const char* GetStateKeys() const;
+ const char* GetDlqStateKeys() const;
+ const char* GetAllShardsRange() const;
- const char* GetAllShardsRange() const {
- if (TablesFormat_ == 1) {
- if (IsFifo_) {
- return R"__(
- '('QueueIdNumberHash (Uint64 '0) (Uint64 '18446744073709551615))
- '('QueueIdNumber queueIdNumber queueIdNumber)
- )__";
- }
- return R"__(
- '('QueueIdNumberAndShardHash (Uint64 '0) (Uint64 '18446744073709551615))
- '('QueueIdNumber queueIdNumber queueIdNumber)
- '('Shard (Uint32 '0) (Uint32 '4294967295))
- )__";
- }
- return "'('State (Uint64 '0) (Uint64 '18446744073709551615))";
- }
const char* GetAttrKeys() const {
return TablesFormat_ == 1 ? QUEUE_ID_KEYS : "'('State (Uint64 '0))";
}
@@ -139,9 +113,6 @@ private:
return tablesFormat == 1 ? "Uint32" : "Uint64";
}
- const char* GetSelectQueueAndShardHash() const;
- const char* GetLoadQueueAndShardHashOrZero() const;
-
void FillQueueVars(
const TString& userName,
const TString& queueName,
diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp
index 23ee4ab5d6..73a7fa71cf 100644
--- a/ydb/core/ymq/queues/std/queries.cpp
+++ b/ydb/core/ymq/queues/std/queries.cpp
@@ -715,10 +715,12 @@ const char* const LoadOrRedriveMessageQuery = R"__(
'('VisibilityDeadline (DataType 'Uint64))))))
(let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
(let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
(let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
(let dlqIdNumber (Parameter 'DLQ_ID_NUMBER (DataType 'Uint64)))
+ (let dlqIdNumberAndShardHash (Parameter 'DLQ_ID_NUMBER_HASH (DataType 'Uint64)))
(let dlqShard (Parameter 'DLQ_SHARD (DataType ')__" DLQ_SHARD_TYPE_PARAM R"__()))
(let dlqIdNumberAndShardHash (Parameter 'DLQ_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
@@ -922,6 +924,7 @@ const char* const LoadOrRedriveMessageQuery = R"__(
const char* const WriteMessageQuery = R"__(
(
(let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
(let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
(let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
(let randomId (Parameter 'RANDOM_ID (DataType 'Uint64)))
@@ -1041,7 +1044,6 @@ const char* const SetRetentionQuery = R"__(
))
(let fields '(
')__" SHARD_COLUMN_NAME_PARAM R"__(
- )__" SELECT_QUEUE_AND_SHARD_HASH_PARAM R"__(
'RetentionBoundary
))
(let records (Member (SelectRange stateTable range fields '()) 'List))
@@ -1057,9 +1059,6 @@ const char* const SetRetentionQuery = R"__(
'('Shard (
Member item ')__" SHARD_COLUMN_NAME_PARAM R"__(
))
- '('QueueIdNumberAndShardHash (
- )__" LOAD_QUEUE_AND_SHARD_HASH_OR_ZERO_PARAM R"__(
- ))
'('RetentionBoundary (Max boundary (Member item 'RetentionBoundary)))
'('Updated updated))))))))
@@ -1072,7 +1071,6 @@ const char* const SetRetentionQuery = R"__(
(Map updated (lambda '(item) (block '(
(let shard (Member item 'Shard))
- (let queueIdNumberAndShardHash (Member item 'QueueIdNumberAndShardHash))
(let row '(
)__" STATE_KEYS_PARAM R"__(
))
@@ -1139,6 +1137,7 @@ const char* const GetRetentionOffsetQuery = R"__(
const char* const LoadInflyQuery = R"__(
(
(let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
(let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
(let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
@@ -1171,9 +1170,9 @@ const char* const LoadInflyQuery = R"__(
const char* const GetStateQuery = R"__(
(
- (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
- (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
- (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
+ (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
+ (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
(let stateTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/State)
@@ -1269,9 +1268,9 @@ const char* const GetUserSettingsQuery = R"__(
const char* const GetMessageCountMetricsQuery = R"__(
(
- (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
- (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
- (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
+ (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
(let stateTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/State)
diff --git a/ydb/tests/library/sqs/tables.py b/ydb/tests/library/sqs/tables.py
index 433ffb3f33..0381e810fa 100644
--- a/ydb/tests/library/sqs/tables.py
+++ b/ydb/tests/library/sqs/tables.py
@@ -132,7 +132,13 @@ def create_attibutes_table(root, session, queue_type):
def create_state_table(root, session, queue_type):
- queue_keys = get_table_keys_for_queue(with_shard=(queue_type == QueueType.STD))
+ queue_keys = columns = [
+ ('QueueIdNumberHash', ydb.PrimitiveType.Uint64),
+ ('QueueIdNumber', ydb.PrimitiveType.Uint64),
+ ]
+ if queue_type == QueueType.STD:
+ queue_keys.append(('Shard', ydb.PrimitiveType.Uint32))
+
columns = queue_keys + [
('CleanupTimestamp', ydb.PrimitiveType.Uint64),
('CreatedTimestamp', ydb.PrimitiveType.Uint64),