diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-06-14 14:30:56 +0300 |
---|---|---|
committer | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-06-14 14:30:56 +0300 |
commit | c38f3409183af2ce08203f9b6d30714cc99fa8a5 (patch) | |
tree | d8226264386a40055ee317fe9812ef127fc18449 | |
parent | 16f8be4f481c275c34795233c18f8d078382fcb3 (diff) | |
download | ydb-c38f3409183af2ce08203f9b6d30714cc99fa8a5.tar.gz |
[ymq] change keys in common state table (remove full scan query)
init
ref:ba679eabbfb1f61c69f391954a71309d7e5fa61b
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 6 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_schema.cpp | 9 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/db_queries_defs.h | 3 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/db_queries_maker.cpp | 47 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/db_queries_maker.h | 35 | ||||
-rw-r--r-- | ydb/core/ymq/queues/std/queries.cpp | 21 | ||||
-rw-r--r-- | ydb/tests/library/sqs/tables.py | 8 |
7 files changed, 67 insertions, 62 deletions
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index d9f79471f7..c36bb5a432 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -1519,7 +1519,6 @@ void TQueueLeader::RequestMessagesCountMetrics(ui64 shard) { .Params() .Uint64("QUEUE_ID_NUMBER", QueueVersion_) .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_)) - .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard)) .AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) .ParentBuilder().Start(); ++MetricsQueriesInfly_; @@ -1751,6 +1750,7 @@ void TQueueLeader::StartLoadingInfly(ui64 shard, bool afterFailure) { .Counters(Counters_) .Params() .Uint64("QUEUE_ID_NUMBER", QueueVersion_) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_)) .AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard)) .ParentBuilder().Start(); @@ -1769,8 +1769,8 @@ void TQueueLeader::StartLoadingInfly(ui64 shard, bool afterFailure) { .Counters(Counters_) .Params() .Uint64("QUEUE_ID_NUMBER", QueueVersion_) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_)) .AddWithType("SHARD", shard, TablesFormat_ == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) - .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(QueueVersion_, shard)) .ParentBuilder().Start(); } @@ -2599,6 +2599,7 @@ void TQueueLeader::TLoadBatch::Execute(TQueueLeader* leader) { .RetryOnTimeout() .Params() .Uint64("QUEUE_ID_NUMBER", leader->QueueVersion_) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(leader->QueueVersion_)) .Uint64("QUEUE_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(leader->QueueVersion_, Shard)) .Uint64("NOW", now.MilliSeconds()) .Uint64("READ_ID", RandomNumber<ui64>()) @@ -2655,6 +2656,7 @@ void TQueueLeader::TLoadBatch::Execute(TQueueLeader* leader) { builder.Params() .Uint64("DLQ_ID_NUMBER", dlqInfo.QueueVersion) + .Uint64("DLQ_ID_NUMBER_HASH", GetKeysHash(dlqInfo.QueueVersion)) .AddWithType("DLQ_SHARD", dlqShard, dlqInfo.TablesFormat == 1 ? NScheme::NTypeIds::Uint32 : NScheme::NTypeIds::Uint64) .Uint64("DLQ_ID_NUMBER_AND_SHARD_HASH", GetKeysHash(dlqInfo.QueueVersion, dlqShard)) .Uint64("DEAD_LETTERS_COUNT", deadLettersCounter); diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp index 5de9ba2058..a477a4d6be 100644 --- a/ydb/core/ymq/actor/queue_schema.cpp +++ b/ydb/core/ymq/actor/queue_schema.cpp @@ -906,11 +906,10 @@ static const char* const CommitQueueParamsQuery = R"__( (ListIf willCommit (UpdateRow eventsTable eventsRow eventsUpdate)) (ListIf willCommit (UpdateRow attrsTable attrRow attrUpdate)) - (If (Not willCommit) (AsList (Void)) - (Map (Enumerate queueIdNumberAndShardHashes) (lambda '(item) (block '( - (let shardOriginal (Nth item '0)) + + (If (Not willCommit) (AsList (Void)) + (Map (ListFromRange (Uint64 '0) shards) (lambda '(shardOriginal) (block '( (let shard (Cast shardOriginal 'Uint32)) - (let queueIdNumberAndShardHash (Nth item '1)) (let row '(%5$s)) (let update '( @@ -937,7 +936,7 @@ TString GetStateTableKeys(ui32 tablesFormat, bool isFifo) { )__"; } return R"__( - '('QueueIdNumberAndShardHash queueIdNumberAndShardHash) + '('QueueIdNumberHash queueIdNumberHash) '('QueueIdNumber version) '('Shard shard) )__"; diff --git a/ydb/core/ymq/queues/common/db_queries_defs.h b/ydb/core/ymq/queues/common/db_queries_defs.h index 8179ef96ec..4ec1ebb8a0 100644 --- a/ydb/core/ymq/queues/common/db_queries_defs.h +++ b/ydb/core/ymq/queues/common/db_queries_defs.h @@ -23,6 +23,3 @@ #define DLQ_ID_AND_SHARD_KEYS_PARAM "%17$s"
#define DLQ_SHARD_TYPE_PARAM "%18$s"
#define DLQ_STATE_KEYS_PARAM "%19$s"
-
-#define SELECT_QUEUE_AND_SHARD_HASH_PARAM "%20$s"
-#define LOAD_QUEUE_AND_SHARD_HASH_OR_ZERO_PARAM "%21$s"
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.cpp b/ydb/core/ymq/queues/common/db_queries_maker.cpp index 83915a8039..abaedf5039 100644 --- a/ydb/core/ymq/queues/common/db_queries_maker.cpp +++ b/ydb/core/ymq/queues/common/db_queries_maker.cpp @@ -2,15 +2,49 @@ #include "queries.h"
+namespace {
+ const char* const STD_STATE_KEYS = R"__(
+ '('QueueIdNumberHash queueIdNumberHash)
+ '('QueueIdNumber queueIdNumber)
+ '('Shard shard)
+ )__";
+
+ const char* const DLQ_STD_STATE_KEYS = R"__(
+ '('QueueIdNumberHash dlqIdNumberHash)
+ '('QueueIdNumber dlqIdNumber)
+ '('Shard dlqShard)
+ )__";
+}
+
+
namespace NKikimr::NSQS {
+ const char* TDbQueriesMaker::GetStateKeys() const {
+ if (TablesFormat_ == 0) {
+ return IsFifo_ ? "'('State (Uint64 '0))" : "'('State shard)";
+ }
+ return IsFifo_ ? GetIdKeys() : STD_STATE_KEYS;
+ }
- const char* TDbQueriesMaker::GetSelectQueueAndShardHash() const {
- return TablesFormat_ == 1 ? "'QueueIdNumberAndShardHash" : "";
+ const char* TDbQueriesMaker::GetDlqStateKeys() const {
+ if (TablesFormat_ == 0) {
+ return IsFifo_ ? "'('State (Uint64 '0))" : "'('State dlqShard)";
+ }
+ return IsFifo_ ? GetDlqIdKeys() : DLQ_STD_STATE_KEYS;
}
- const char* TDbQueriesMaker::GetLoadQueueAndShardHashOrZero() const {
- return TablesFormat_ == 1 ? "Member item 'QueueIdNumberAndShardHash" : "Uint64 '0";
+ const char* TDbQueriesMaker::GetAllShardsRange() const {
+ if (TablesFormat_ == 1) {
+ if (IsFifo_) {
+ return QUEUE_ID_KEYS_RANGE;
+ }
+ return R"__(
+ '('QueueIdNumberHash queueIdNumberHash queueIdNumberHash)
+ '('QueueIdNumber queueIdNumber queueIdNumber)
+ '('Shard (Uint32 '0) (Uint32 '4294967295))
+ )__";
+ }
+ return "'('State (Uint64 '0) (Uint64 '18446744073709551615))";
}
TString TDbQueriesMaker::FillQuery(const char* query) const {
@@ -37,10 +71,7 @@ namespace NKikimr::NSQS { GetDlqIdKeys(), // 16
GetDlqIdAndShardKeys(), // 17
GetShardColumnType(DlqTablesFormat_), // 18
- GetDlqStateKeys(), // 19
-
- GetSelectQueueAndShardHash(), // 20
- GetLoadQueueAndShardHashOrZero() // 21
+ GetDlqStateKeys() // 19
);
}
diff --git a/ydb/core/ymq/queues/common/db_queries_maker.h b/ydb/core/ymq/queues/common/db_queries_maker.h index e4514a0160..8597ad058d 100644 --- a/ydb/core/ymq/queues/common/db_queries_maker.h +++ b/ydb/core/ymq/queues/common/db_queries_maker.h @@ -81,36 +81,10 @@ public: TString GetMatchQueueAttributesQuery() const;
private:
- const char* GetStateKeys() const {
- if (TablesFormat_ == 0) {
- return IsFifo_ ? "'('State (Uint64 '0))" : "'('State shard)";
- }
- return IsFifo_ ? GetIdKeys() : GetIdAndShardKeys();
- }
-
- const char* GetDlqStateKeys() const {
- if (TablesFormat_ == 0) {
- return IsFifo_ ? "'('State (Uint64 '0))" : "'('State dlqShard)";
- }
- return IsFifo_ ? GetDlqIdKeys() : GetDlqIdAndShardKeys();
- }
+ const char* GetStateKeys() const;
+ const char* GetDlqStateKeys() const;
+ const char* GetAllShardsRange() const;
- const char* GetAllShardsRange() const {
- if (TablesFormat_ == 1) {
- if (IsFifo_) {
- return R"__(
- '('QueueIdNumberHash (Uint64 '0) (Uint64 '18446744073709551615))
- '('QueueIdNumber queueIdNumber queueIdNumber)
- )__";
- }
- return R"__(
- '('QueueIdNumberAndShardHash (Uint64 '0) (Uint64 '18446744073709551615))
- '('QueueIdNumber queueIdNumber queueIdNumber)
- '('Shard (Uint32 '0) (Uint32 '4294967295))
- )__";
- }
- return "'('State (Uint64 '0) (Uint64 '18446744073709551615))";
- }
const char* GetAttrKeys() const {
return TablesFormat_ == 1 ? QUEUE_ID_KEYS : "'('State (Uint64 '0))";
}
@@ -139,9 +113,6 @@ private: return tablesFormat == 1 ? "Uint32" : "Uint64";
}
- const char* GetSelectQueueAndShardHash() const;
- const char* GetLoadQueueAndShardHashOrZero() const;
-
void FillQueueVars(
const TString& userName,
const TString& queueName,
diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp index 23ee4ab5d6..73a7fa71cf 100644 --- a/ydb/core/ymq/queues/std/queries.cpp +++ b/ydb/core/ymq/queues/std/queries.cpp @@ -715,10 +715,12 @@ const char* const LoadOrRedriveMessageQuery = R"__( '('VisibilityDeadline (DataType 'Uint64)))))) (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) (let dlqIdNumber (Parameter 'DLQ_ID_NUMBER (DataType 'Uint64))) + (let dlqIdNumberAndShardHash (Parameter 'DLQ_ID_NUMBER_HASH (DataType 'Uint64))) (let dlqShard (Parameter 'DLQ_SHARD (DataType ')__" DLQ_SHARD_TYPE_PARAM R"__())) (let dlqIdNumberAndShardHash (Parameter 'DLQ_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) @@ -922,6 +924,7 @@ const char* const LoadOrRedriveMessageQuery = R"__( const char* const WriteMessageQuery = R"__( ( (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) (let randomId (Parameter 'RANDOM_ID (DataType 'Uint64))) @@ -1041,7 +1044,6 @@ const char* const SetRetentionQuery = R"__( )) (let fields '( ')__" SHARD_COLUMN_NAME_PARAM R"__( - )__" SELECT_QUEUE_AND_SHARD_HASH_PARAM R"__( 'RetentionBoundary )) (let records (Member (SelectRange stateTable range fields '()) 'List)) @@ -1057,9 +1059,6 @@ const char* const SetRetentionQuery = R"__( '('Shard ( Member item ')__" SHARD_COLUMN_NAME_PARAM R"__( )) - '('QueueIdNumberAndShardHash ( - )__" LOAD_QUEUE_AND_SHARD_HASH_OR_ZERO_PARAM R"__( - )) '('RetentionBoundary (Max boundary (Member item 'RetentionBoundary))) '('Updated updated)))))))) @@ -1072,7 +1071,6 @@ const char* const SetRetentionQuery = R"__( (Map updated (lambda '(item) (block '( (let shard (Member item 'Shard)) - (let queueIdNumberAndShardHash (Member item 'QueueIdNumberAndShardHash)) (let row '( )__" STATE_KEYS_PARAM R"__( )) @@ -1139,6 +1137,7 @@ const char* const GetRetentionOffsetQuery = R"__( const char* const LoadInflyQuery = R"__( ( (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) @@ -1171,9 +1170,9 @@ const char* const LoadInflyQuery = R"__( const char* const GetStateQuery = R"__( ( - (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) - (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) - (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) + (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) + (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) (let stateTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/State) @@ -1269,9 +1268,9 @@ const char* const GetUserSettingsQuery = R"__( const char* const GetMessageCountMetricsQuery = R"__( ( - (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) - (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) - (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) + (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) (let stateTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/State) diff --git a/ydb/tests/library/sqs/tables.py b/ydb/tests/library/sqs/tables.py index 433ffb3f33..0381e810fa 100644 --- a/ydb/tests/library/sqs/tables.py +++ b/ydb/tests/library/sqs/tables.py @@ -132,7 +132,13 @@ def create_attibutes_table(root, session, queue_type): def create_state_table(root, session, queue_type): - queue_keys = get_table_keys_for_queue(with_shard=(queue_type == QueueType.STD)) + queue_keys = columns = [ + ('QueueIdNumberHash', ydb.PrimitiveType.Uint64), + ('QueueIdNumber', ydb.PrimitiveType.Uint64), + ] + if queue_type == QueueType.STD: + queue_keys.append(('Shard', ydb.PrimitiveType.Uint32)) + columns = queue_keys + [ ('CleanupTimestamp', ydb.PrimitiveType.Uint64), ('CreatedTimestamp', ydb.PrimitiveType.Uint64), |