diff options
author | alexbogo <alexbogo@ydb.tech> | 2022-11-26 13:55:59 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2022-11-26 13:55:59 +0300 |
commit | f8936387dad2132203132ba4ede801c07990aee4 (patch) | |
tree | 8c5364874519f5418d833cd6e6e1a95e28a816a4 | |
parent | e1d81bf14c5f2a3d8b82345dc28c21d130f984d2 (diff) | |
download | ydb-f8936387dad2132203132ba4ede801c07990aee4.tar.gz |
[ymq] fix dlq for queues in common tables format (v1)
init
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 181 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.h | 6 | ||||
-rw-r--r-- | ydb/core/ymq/queues/fifo/queries.cpp | 41 | ||||
-rw-r--r-- | ydb/core/ymq/queues/std/queries.cpp | 32 |
4 files changed, 156 insertions, 104 deletions
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index e042d2ea6cf..0b8d330405f 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -636,62 +636,73 @@ void TQueueLeader::ReadFifoMessages(TReceiveMessageBatchRequestProcessing& reqIn builder.Start(); } +void TQueueLeader::OnFifoMessagesReadSuccess(const NKikimr::NClient::TValue& value, TReceiveMessageBatchRequestProcessing& reqInfo) { + const NKikimr::NClient::TValue list(value["result"]); + + if (const ui64 movedMessagesCount = value["movedMessagesCount"]) { + ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount); + + const i64 newMessagesCount = value["newMessagesCount"]; + Y_VERIFY(newMessagesCount >= 0); + auto& shardInfo = Shards_[0]; + shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount); + } + + reqInfo.Answer->Messages.resize(list.Size()); + for (size_t i = 0; i < list.Size(); ++i) { + const NKikimr::NClient::TValue& data = list[i]["SourceDataFieldsRead"]; + const NKikimr::NClient::TValue& msg = list[i]["SourceMessageFieldsRead"]; + const ui64 receiveTimestamp = msg["FirstReceiveTimestamp"]; + auto& msgAnswer = reqInfo.Answer->Messages[i]; + + msgAnswer.FirstReceiveTimestamp = (receiveTimestamp ? TInstant::MilliSeconds(receiveTimestamp) : reqInfo.LockSendTs); + msgAnswer.ReceiveCount = ui32(msg["ReceiveCount"]) + 1; // since the query returns old receive count value + msgAnswer.MessageId = data["MessageId"]; + msgAnswer.MessageDeduplicationId = data["DedupId"]; + msgAnswer.MessageGroupId = msg["GroupId"]; + msgAnswer.Data = data["Data"]; + msgAnswer.SentTimestamp = TInstant::MilliSeconds(ui64(msg["SentTimestamp"])); + msgAnswer.SequenceNumber = msg["Offset"]; + + msgAnswer.ReceiptHandle.SetMessageGroupId(TString(msg["GroupId"])); + msgAnswer.ReceiptHandle.SetOffset(msgAnswer.SequenceNumber); + msgAnswer.ReceiptHandle.SetReceiveRequestAttemptId(reqInfo.Event->Get()->ReceiveAttemptId); + msgAnswer.ReceiptHandle.SetLockTimestamp(reqInfo.LockSendTs.MilliSeconds()); + msgAnswer.ReceiptHandle.SetShard(0); + + const NKikimr::NClient::TValue senderIdValue = data["SenderId"]; + if (senderIdValue.HaveValue()) { + if (const TString senderId = TString(senderIdValue)) { + msgAnswer.SenderId = senderId; + } + } + + const NKikimr::NClient::TValue attributesValue = data["Attributes"]; + if (attributesValue.HaveValue()) { + msgAnswer.MessageAttributes = attributesValue; + } + } +} + void TQueueLeader::OnFifoMessagesRead(const TString& requestId, const TSqsEvents::TEvExecuted::TRecord& ev, const bool usedDLQ) { auto reqInfoIt = ReceiveMessageRequests_.find(requestId); Y_VERIFY(reqInfoIt != ReceiveMessageRequests_.end()); auto& reqInfo = reqInfoIt->second; + bool dlqExists = true; + bool success = false; if (ev.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { - using NKikimr::NClient::TValue; - const TValue value(TValue::Create(ev.GetExecutionEngineEvaluatedResponse())); - const TValue list(value["result"]); - - if (const ui64 movedMessagesCount = value["movedMessagesCount"]) { - ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount); - - const i64 newMessagesCount = value["newMessagesCount"]; - Y_VERIFY(newMessagesCount >= 0); - auto& shardInfo = Shards_[0]; - shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount); + const NKikimr::NClient::TValue value = NKikimr::NClient::TValue::Create(ev.GetExecutionEngineEvaluatedResponse()); + dlqExists = value["dlqExists"]; + if (dlqExists) { + success = true; + OnFifoMessagesReadSuccess(value, reqInfo); } + } - reqInfo.Answer->Messages.resize(list.Size()); - for (size_t i = 0; i < list.Size(); ++i) { - const TValue& data = list[i]["SourceDataFieldsRead"]; - const TValue& msg = list[i]["SourceMessageFieldsRead"]; - const ui64 receiveTimestamp = msg["FirstReceiveTimestamp"]; - auto& msgAnswer = reqInfo.Answer->Messages[i]; - - msgAnswer.FirstReceiveTimestamp = (receiveTimestamp ? TInstant::MilliSeconds(receiveTimestamp) : reqInfo.LockSendTs); - msgAnswer.ReceiveCount = ui32(msg["ReceiveCount"]) + 1; // since the query returns old receive count value - msgAnswer.MessageId = data["MessageId"]; - msgAnswer.MessageDeduplicationId = data["DedupId"]; - msgAnswer.MessageGroupId = msg["GroupId"]; - msgAnswer.Data = data["Data"]; - msgAnswer.SentTimestamp = TInstant::MilliSeconds(ui64(msg["SentTimestamp"])); - msgAnswer.SequenceNumber = msg["Offset"]; - - msgAnswer.ReceiptHandle.SetMessageGroupId(TString(msg["GroupId"])); - msgAnswer.ReceiptHandle.SetOffset(msgAnswer.SequenceNumber); - msgAnswer.ReceiptHandle.SetReceiveRequestAttemptId(reqInfo.Event->Get()->ReceiveAttemptId); - msgAnswer.ReceiptHandle.SetLockTimestamp(reqInfo.LockSendTs.MilliSeconds()); - msgAnswer.ReceiptHandle.SetShard(0); - - const TValue senderIdValue = data["SenderId"]; - if (senderIdValue.HaveValue()) { - if (const TString senderId = TString(senderIdValue)) { - msgAnswer.SenderId = senderId; - } - } - - const TValue attributesValue = data["Attributes"]; - if (attributesValue.HaveValue()) { - msgAnswer.MessageAttributes = attributesValue; - } - } - } else { + if (!success) { const auto errStatus = NKikimr::NTxProxy::TResultStatus::EStatus(ev.GetStatus()); - if (usedDLQ && !NTxProxy::TResultStatus::IsSoftErrorWithoutSideEffects(errStatus)) { + if (usedDLQ && (!dlqExists || !NTxProxy::TResultStatus::IsSoftErrorWithoutSideEffects(errStatus))) { // it's possible that DLQ was removed, hence it'd be wise to refresh corresponding info DlqInfo_.Clear(); reqInfo.Answer->Failed = false; @@ -735,13 +746,13 @@ void TQueueLeader::LoadStdMessages(TReceiveMessageBatchRequestProcessing& reqInf } } -void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 offset, const TSqsEvents::TEvExecuted::TRecord& ev, const NKikimr::NClient::TValue* messageRecord, const bool ignoreMessageLoadingErrors) { +void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 offset, bool success, const NKikimr::NClient::TValue* messageRecord, const bool ignoreMessageLoadingErrors) { auto reqInfoIt = ReceiveMessageRequests_.find(requestId); Y_VERIFY(reqInfoIt != ReceiveMessageRequests_.end()); auto& reqInfo = reqInfoIt->second; --reqInfo.LoadAnswersLeft; - if (ev.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { + if (success) { bool deleted = true; bool deadlineChanged = true; const bool exists = (*messageRecord)["Exists"]; @@ -820,6 +831,33 @@ void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 o } } +void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch) { + const NKikimr::NClient::TValue list(value["result"]); + Y_VERIFY(list.Size() == batch->Size()); + + if (const ui64 movedMessagesCount = value["movedMessagesCount"]) { + ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount); + + const i64 newMessagesCount = value["newMessagesCount"]; + Y_VERIFY(newMessagesCount >= 0); + shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount); + } + + THashMap<ui64, const TLoadBatchEntry*> offset2entry; + offset2entry.reserve(batch->Entries.size()); + for (const TLoadBatchEntry& entry : batch->Entries) { + offset2entry.emplace(entry.Offset, &entry); + } + + for (size_t i = 0; i < list.Size(); ++i) { + auto msg = list[i]; + const ui64 offset = msg["Offset"]; + const auto entry = offset2entry.find(offset); + Y_VERIFY(entry != offset2entry.end()); + OnLoadStdMessageResult(entry->second->RequestId, offset, true, &msg, false); + } +} + void TQueueLeader::OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, const bool usedDLQ, const TSqsEvents::TEvExecuted::TRecord& reply) { auto& shardInfo = Shards_[shard]; auto& batchingState = shardInfo.LoadBatchingState; @@ -827,37 +865,22 @@ void TQueueLeader::OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, cons Y_VERIFY(batchIt != batchingState.BatchesExecuting.end()); auto batch = batchIt->second; auto status = TEvTxUserProxy::TEvProposeTransactionStatus::EStatus(reply.GetStatus()); - bool ignoreMessageLoadingErrors = false; - if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { - using NKikimr::NClient::TValue; - const TValue value(TValue::Create(reply.GetExecutionEngineEvaluatedResponse())); - const TValue list(value["result"]); - Y_VERIFY(list.Size() == batch->Size()); - - if (const ui64 movedMessagesCount = value["movedMessagesCount"]) { - ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount); - const i64 newMessagesCount = value["newMessagesCount"]; - Y_VERIFY(newMessagesCount >= 0); - shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount); - } - - THashMap<ui64, const TLoadBatchEntry*> offset2entry; - offset2entry.reserve(batch->Entries.size()); - for (const TLoadBatchEntry& entry : batch->Entries) { - offset2entry.emplace(entry.Offset, &entry); + bool dlqExists = true; + bool success = false; + if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { + const NKikimr::NClient::TValue value = NKikimr::NClient::TValue::Create(reply.GetExecutionEngineEvaluatedResponse()); + dlqExists = value["dlqExists"]; + if (dlqExists) { + success = true; + OnLoadStdMessagesBatchSuccess(value, shardInfo, batch); } + } - for (size_t i = 0; i < list.Size(); ++i) { - auto msg = list[i]; - const ui64 offset = msg["Offset"]; - const auto entry = offset2entry.find(offset); - Y_VERIFY(entry != offset2entry.end()); - OnLoadStdMessageResult(entry->second->RequestId, offset, reply, &msg, ignoreMessageLoadingErrors); - } - } else { + if (!success) { const auto errStatus = NKikimr::NTxProxy::TResultStatus::EStatus(reply.GetStatus()); - if (usedDLQ && !NTxProxy::TResultStatus::IsSoftErrorWithoutSideEffects(errStatus)) { + bool ignoreMessageLoadingErrors = false; + if (usedDLQ && (!dlqExists || !NTxProxy::TResultStatus::IsSoftErrorWithoutSideEffects(errStatus))) { // it's possible that DLQ was removed, hence it'd be wise to refresh corresponding info DlqInfo_.Clear(); ignoreMessageLoadingErrors = true; @@ -868,9 +891,11 @@ void TQueueLeader::OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, cons const TLoadBatchEntry& entry = batch->Entries[i]; if (!prevRequestId || *prevRequestId != entry.RequestId) { prevRequestId = &entry.RequestId; - RLOG_SQS_REQ_ERROR(entry.RequestId, "Batch transaction failed: " << reply << ". BatchId: " << batch->BatchId); + RLOG_SQS_REQ_ERROR(entry.RequestId, + "Batch transaction failed: " << reply << ". DlqExists=" << dlqExists << ". BatchId: " << batch->BatchId + ); } - OnLoadStdMessageResult(entry.RequestId, entry.Offset, reply, nullptr, ignoreMessageLoadingErrors); + OnLoadStdMessageResult(entry.RequestId, entry.Offset, success, nullptr, ignoreMessageLoadingErrors); } } batchingState.BatchesExecuting.erase(batchId); diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h index aace8e884e9..2b6e6a00571 100644 --- a/ydb/core/ymq/actor/queue_leader.h +++ b/ydb/core/ymq/actor/queue_leader.h @@ -29,6 +29,8 @@ class TQueueLeader : public TActorBootstrapped<TQueueLeader> { struct TDeleteMessageBatchRequestProcessing; struct TChangeMessageVisibilityBatchRequestProcessing; struct TGetRuntimeQueueAttributesRequestProcessing; + struct TShardInfo; + struct TLoadBatch; public: TQueueLeader(TString userName, TString queueName, TString folderId, TString rootUrl, TIntrusivePtr<TQueueCounters> counters, TIntrusivePtr<TUserCounters> userCounters, const TActorId& schemeCache, const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser); @@ -116,15 +118,17 @@ private: void LockFifoGroup(TReceiveMessageBatchRequestProcessing& reqInfo); void OnFifoGroupLocked(const TString& requestId, const TSqsEvents::TEvExecuted::TRecord& ev); void ReadFifoMessages(TReceiveMessageBatchRequestProcessing& reqInfo); + void OnFifoMessagesReadSuccess(const NKikimr::NClient::TValue& value, TReceiveMessageBatchRequestProcessing& reqInfo); void OnFifoMessagesRead(const TString& requestId, const TSqsEvents::TEvExecuted::TRecord& ev, bool usedDLQ); void GetMessagesFromInfly(TReceiveMessageBatchRequestProcessing& reqInfo); void LoadStdMessages(TReceiveMessageBatchRequestProcessing& reqInfo); - void OnLoadStdMessageResult(const TString& requestId, ui64 offset, const TSqsEvents::TEvExecuted::TRecord& ev, const NKikimr::NClient::TValue* messageRecord, bool ignoreMessageLoadingErrors); + void OnLoadStdMessageResult(const TString& requestId, ui64 offset, bool success, const NKikimr::NClient::TValue* messageRecord, bool ignoreMessageLoadingErrors); void TryReceiveAnotherShard(TReceiveMessageBatchRequestProcessing& reqInfo); void WaitAddMessagesToInflyOrTryAnotherShard(TReceiveMessageBatchRequestProcessing& reqInfo); void Reply(TReceiveMessageBatchRequestProcessing& reqInfo); // batching + void OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch); void OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, const bool usedDLQ, const TSqsEvents::TEvExecuted::TRecord& reply); // delete diff --git a/ydb/core/ymq/queues/fifo/queries.cpp b/ydb/core/ymq/queues/fifo/queries.cpp index 0d5c5239e2c..ecc24b93eb2 100644 --- a/ydb/core/ymq/queues/fifo/queries.cpp +++ b/ydb/core/ymq/queues/fifo/queries.cpp @@ -693,6 +693,7 @@ const char* const ReadMessageQuery = R"__( ))))) (return (Extend + (AsList (SetResult 'dlqExists (Bool 'true))) (AsList (SetResult 'result result)) (AsList (SetResult 'movedMessagesCount (Uint64 '0))) @@ -826,6 +827,7 @@ const char* const ReadOrRedriveMessageQuery = R"__( 'WriteOffset 'LastModifiedTimestamp)) (let dlqStateRead (SelectRow dlqStateTable dlqStateRow dlqStateSelect)) + (let dlqExists (Exists dlqStateRead)) (let dlqSentTimestamp (Max now (Member dlqStateRead 'LastModifiedTimestamp))) (let dlqStartOffset (Add (Member dlqStateRead 'WriteOffset) (Uint64 '1))) @@ -860,14 +862,15 @@ const char* const ReadOrRedriveMessageQuery = R"__( '('MessageCount newSourceMsgCount))) (return (Extend + (AsList (SetResult 'dlqExists dlqExists)) (AsList (SetResult 'result messagesToReturnAsStruct)) (AsList (SetResult 'movedMessagesCount (Length messagesToMoveAsStruct))) (AsList (SetResult 'newMessagesCount newSourceMsgCount)) - (ListIf (HasItems messagesToMoveAsStruct) (UpdateRow dlqStateTable dlqStateRow dlqStateUpdate)) - (ListIf (HasItems messagesToMoveAsStruct) (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate)) + (ListIf (And (HasItems messagesToMoveAsStruct) dlqExists) (UpdateRow dlqStateTable dlqStateRow dlqStateUpdate)) + (ListIf (And (HasItems messagesToMoveAsStruct) dlqExists) (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate)) # copy messages to dlq - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let dlqDataRow '( )__" DLQ_ID_KEYS_PARAM R"__( '('RandomId randomId) @@ -879,8 +882,9 @@ const char* const ReadOrRedriveMessageQuery = R"__( '('SenderId (Member (Nth item '1) 'SenderId)) '('MessageId (Member (Nth item '1) 'MessageId)))) (return (UpdateRow dlqDataTable dlqDataRow dlqDataUpdate)))))) + (AsList (Void))) - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let dlqMsgRow '( )__" DLQ_ID_KEYS_PARAM R"__( '('Offset (Nth item '0)))) @@ -893,8 +897,9 @@ const char* const ReadOrRedriveMessageQuery = R"__( '('FirstReceiveTimestamp (Uint64 '0)) '('SentTimestamp dlqSentTimestamp))) (return (UpdateRow dlqMsgTable dlqMsgRow dlqMessageUpdate)))))) + (AsList (Void))) - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let dlqSentTsRow '( )__" DLQ_ID_KEYS_PARAM R"__( '('SentTimestamp dlqSentTimestamp) @@ -906,8 +911,9 @@ const char* const ReadOrRedriveMessageQuery = R"__( '('DelayDeadline delayDeadline) '('GroupId (Member (Nth item '1) 'GroupId)))) (return (UpdateRow dlqSentTsIdx dlqSentTsRow dlqSentTsUpdate)))))) + (AsList (Void))) - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let dlqGroupRow '( )__" DLQ_ID_KEYS_PARAM R"__( '('GroupId (Member (Nth item '1) 'GroupId)))) @@ -930,8 +936,9 @@ const char* const ReadOrRedriveMessageQuery = R"__( (UpdateRow dlqGroupTable dlqGroupRow dlqGroupUpdate) ) ))))) + (AsList (Void))) - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let dlqTail (Member (Nth item '1) 'DlqTail)) (let dlqPrevMessageRow '( )__" DLQ_ID_KEYS_PARAM R"__( @@ -944,29 +951,33 @@ const char* const ReadOrRedriveMessageQuery = R"__( (UpdateRow dlqMsgTable dlqPrevMessageRow dlqPrevMessageUpdate) (Void)) ))))) + (AsList (Void))) # remove dead letters' content from source queue - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let row '( )__" QUEUE_ID_KEYS_PARAM R"__( '('RandomId (Member (Nth item '1) 'SourceRandomId)) '('Offset (Member (Nth item '1) 'SourceOffset)))) (return (EraseRow sourceDataTable row)))))) + (AsList (Void))) - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let row '( )__" QUEUE_ID_KEYS_PARAM R"__( '('Offset (Member (Nth item '1) 'SourceOffset)))) (return (EraseRow sourceMsgTable row)))))) + (AsList (Void))) - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let row '( )__" QUEUE_ID_KEYS_PARAM R"__( '('SentTimestamp (Member (Nth item '1) 'SourceSentTimestamp)) '('Offset (Member (Nth item '1) 'SourceOffset)))) (return (EraseRow sourceSentTsIdx row)))))) + (AsList (Void))) - (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( + (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '( (let row '( )__" QUEUE_ID_KEYS_PARAM R"__( '('GroupId (Member (Nth item '1) 'GroupId)))) @@ -975,14 +986,14 @@ const char* const ReadOrRedriveMessageQuery = R"__( '('Head (Member (Nth item '1) 'SourceNextOffset)) '('LockTimestamp (Uint64 '0)) '('VisibilityDeadline (Uint64 '0)))) - (return (If (Coalesce (Equal (Member (Nth item '1) 'SourceNextOffset) (Uint64 '0)) (Bool 'false)) (EraseRow sourceGroupTable row) (UpdateRow sourceGroupTable row update))))))) + (AsList (Void))) # just return ordinary messages - (Map messagesToReturnAsStruct (lambda '(item) (block '( + (If dlqExists (Map messagesToReturnAsStruct (lambda '(item) (block '( (let message (Member item 'SourceMessageFieldsRead)) (let row '( )__" QUEUE_ID_KEYS_PARAM R"__( @@ -992,7 +1003,9 @@ const char* const ReadOrRedriveMessageQuery = R"__( (let update '( '('FirstReceiveTimestamp receiveTimestamp) '('ReceiveCount (Add (Member message 'ReceiveCount) (Uint32 '1))))) - (return (UpdateRow sourceMsgTable row update)))))))) + (return (UpdateRow sourceMsgTable row update)))))) + (AsList (Void))) + )) ) )__"; diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp index 1df5c5a87e7..9d7339fa3b9 100644 --- a/ydb/core/ymq/queues/std/queries.cpp +++ b/ydb/core/ymq/queues/std/queries.cpp @@ -686,6 +686,7 @@ const char* const LoadMessageQuery = R"__( (return (Coalesce (Member item 'Valid) (Bool 'false)))))))) (return (Extend + (AsList (SetResult 'dlqExists (Bool 'true))) (AsList (SetResult 'result records)) (AsList (SetResult 'movedMessagesCount (Uint64 '0))) @@ -715,12 +716,12 @@ const char* const LoadOrRedriveMessageQuery = R"__( '('VisibilityDeadline (DataType 'Uint64)))))) (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) - (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) (let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__())) (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) (let dlqIdNumber (Parameter 'DLQ_ID_NUMBER (DataType 'Uint64))) - (let dlqIdNumberAndShardHash (Parameter 'DLQ_ID_NUMBER_HASH (DataType 'Uint64))) + (let dlqIdNumberHash (Parameter 'DLQ_ID_NUMBER_HASH (DataType 'Uint64))) (let dlqShard (Parameter 'DLQ_SHARD (DataType ')__" DLQ_SHARD_TYPE_PARAM R"__())) (let dlqIdNumberAndShardHash (Parameter 'DLQ_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64))) @@ -825,6 +826,7 @@ const char* const LoadOrRedriveMessageQuery = R"__( 'WriteOffset 'LastModifiedTimestamp)) (let deadLetterStateRead (SelectRow deadLetterStateTable deadLetterStateRow deadLetterStateSelect)) + (let dlqExists (Exists deadLetterStateRead)) (let newDlqMessagesCount (Add (Member deadLetterStateRead 'MessageCount) (Length messagesToMove))) (let newDlqWriteOffset (Add (Member deadLetterStateRead 'WriteOffset) deadLettersCount)) @@ -844,13 +846,14 @@ const char* const LoadOrRedriveMessageQuery = R"__( '('InflyCount (Sub (Member sourceStateRead 'InflyCount) (Length messagesToMove))))) (return (Extend + (AsList (SetResult 'dlqExists dlqExists)) (AsList (SetResult 'result records)) (AsList (SetResult 'movedMessagesCount (Length messagesToMove))) (AsList (SetResult 'newMessagesCount newSourceMessagesCount)) - (AsList (UpdateRow deadLetterStateTable deadLetterStateRow deadLetterStateUpdate)) - (AsList (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate)) + (ListIf dlqExists (UpdateRow deadLetterStateTable deadLetterStateRow deadLetterStateUpdate)) + (ListIf dlqExists (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate)) - (Map messagesToUpdate (lambda '(item) (block '( + (If dlqExists (Map messagesToUpdate (lambda '(item) (block '( (let row '( )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('Offset (Member item 'Offset)))) @@ -861,8 +864,9 @@ const char* const LoadOrRedriveMessageQuery = R"__( '('ReceiveCount (Member item 'ReceiveCount)) '('VisibilityDeadline (Member item 'VisibilityDeadline)))) (return (UpdateRow sourceInflyTable row update)))))) + (AsList (Void))) - (Map messagesToMove (lambda '(item) (block '( + (If dlqExists (Map messagesToMove (lambda '(item) (block '( (let msgRow '( )__" DLQ_ID_AND_SHARD_KEYS_PARAM R"__( '('Offset (Add dlqStartOffset (Member item 'DlqIndex))))) @@ -872,8 +876,9 @@ const char* const LoadOrRedriveMessageQuery = R"__( '('SentTimestamp dlqMostRecentTimestamp) '('DelayDeadline delayDeadline))) (return (UpdateRow deadLetterMessagesTable msgRow messageUpdate)))))) + (AsList (Void))) - (Map messagesToMove (lambda '(item) (block '( + (If dlqExists (Map messagesToMove (lambda '(item) (block '( (let sentTsRow '( )__" DLQ_ID_AND_SHARD_KEYS_PARAM R"__( '('SentTimestamp dlqMostRecentTimestamp) @@ -883,8 +888,9 @@ const char* const LoadOrRedriveMessageQuery = R"__( '('RandomId readId) '('DelayDeadline delayDeadline))) (return (UpdateRow deadLetterSentTsIdxTable sentTsRow sentTsUpdate)))))) + (AsList (Void))) - (Map messagesToMove (lambda '(item) (block '( + (If dlqExists (Map messagesToMove (lambda '(item) (block '( (let dataRow '( )__" DLQ_ID_AND_SHARD_KEYS_PARAM R"__( '('RandomId readId) @@ -896,27 +902,31 @@ const char* const LoadOrRedriveMessageQuery = R"__( '('SenderId (Member item 'SenderId)) '('MessageId (Member item 'MessageId)))) (return (UpdateRow deadLetterMessageDataTable dataRow dataUpdate)))))) + (AsList (Void))) - (Map messagesToMove (lambda '(item) (block '( + (If dlqExists (Map messagesToMove (lambda '(item) (block '( (let inflyRow '( )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('Offset (Member item 'Offset)))) (return (EraseRow sourceInflyTable inflyRow)))))) + (AsList (Void))) - (Map messagesToMove (lambda '(item) (block '( + (If dlqExists (Map messagesToMove (lambda '(item) (block '( (let dataRow '( )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('RandomId (Member item 'RandomId)) '('Offset (Member item 'Offset)))) (return (EraseRow sourceMessageDataTable dataRow)))))) + (AsList (Void))) - (Map messagesToMove (lambda '(item) (block '( + (If dlqExists (Map messagesToMove (lambda '(item) (block '( (let sentTsRow '( )__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__( '('SentTimestamp (Member item 'SentTimestamp)) '('Offset (Member item 'Offset)))) (return (EraseRow sourceSentTsIdxTable sentTsRow)))))) + (AsList (Void))) )) ) )__"; |