aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2022-11-26 13:55:59 +0300
committeralexbogo <alexbogo@ydb.tech>2022-11-26 13:55:59 +0300
commitf8936387dad2132203132ba4ede801c07990aee4 (patch)
tree8c5364874519f5418d833cd6e6e1a95e28a816a4
parente1d81bf14c5f2a3d8b82345dc28c21d130f984d2 (diff)
downloadydb-f8936387dad2132203132ba4ede801c07990aee4.tar.gz
[ymq] fix dlq for queues in common tables format (v1)
init
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp181
-rw-r--r--ydb/core/ymq/actor/queue_leader.h6
-rw-r--r--ydb/core/ymq/queues/fifo/queries.cpp41
-rw-r--r--ydb/core/ymq/queues/std/queries.cpp32
4 files changed, 156 insertions, 104 deletions
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index e042d2ea6cf..0b8d330405f 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -636,62 +636,73 @@ void TQueueLeader::ReadFifoMessages(TReceiveMessageBatchRequestProcessing& reqIn
builder.Start();
}
+void TQueueLeader::OnFifoMessagesReadSuccess(const NKikimr::NClient::TValue& value, TReceiveMessageBatchRequestProcessing& reqInfo) {
+ const NKikimr::NClient::TValue list(value["result"]);
+
+ if (const ui64 movedMessagesCount = value["movedMessagesCount"]) {
+ ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount);
+
+ const i64 newMessagesCount = value["newMessagesCount"];
+ Y_VERIFY(newMessagesCount >= 0);
+ auto& shardInfo = Shards_[0];
+ shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount);
+ }
+
+ reqInfo.Answer->Messages.resize(list.Size());
+ for (size_t i = 0; i < list.Size(); ++i) {
+ const NKikimr::NClient::TValue& data = list[i]["SourceDataFieldsRead"];
+ const NKikimr::NClient::TValue& msg = list[i]["SourceMessageFieldsRead"];
+ const ui64 receiveTimestamp = msg["FirstReceiveTimestamp"];
+ auto& msgAnswer = reqInfo.Answer->Messages[i];
+
+ msgAnswer.FirstReceiveTimestamp = (receiveTimestamp ? TInstant::MilliSeconds(receiveTimestamp) : reqInfo.LockSendTs);
+ msgAnswer.ReceiveCount = ui32(msg["ReceiveCount"]) + 1; // since the query returns old receive count value
+ msgAnswer.MessageId = data["MessageId"];
+ msgAnswer.MessageDeduplicationId = data["DedupId"];
+ msgAnswer.MessageGroupId = msg["GroupId"];
+ msgAnswer.Data = data["Data"];
+ msgAnswer.SentTimestamp = TInstant::MilliSeconds(ui64(msg["SentTimestamp"]));
+ msgAnswer.SequenceNumber = msg["Offset"];
+
+ msgAnswer.ReceiptHandle.SetMessageGroupId(TString(msg["GroupId"]));
+ msgAnswer.ReceiptHandle.SetOffset(msgAnswer.SequenceNumber);
+ msgAnswer.ReceiptHandle.SetReceiveRequestAttemptId(reqInfo.Event->Get()->ReceiveAttemptId);
+ msgAnswer.ReceiptHandle.SetLockTimestamp(reqInfo.LockSendTs.MilliSeconds());
+ msgAnswer.ReceiptHandle.SetShard(0);
+
+ const NKikimr::NClient::TValue senderIdValue = data["SenderId"];
+ if (senderIdValue.HaveValue()) {
+ if (const TString senderId = TString(senderIdValue)) {
+ msgAnswer.SenderId = senderId;
+ }
+ }
+
+ const NKikimr::NClient::TValue attributesValue = data["Attributes"];
+ if (attributesValue.HaveValue()) {
+ msgAnswer.MessageAttributes = attributesValue;
+ }
+ }
+}
+
void TQueueLeader::OnFifoMessagesRead(const TString& requestId, const TSqsEvents::TEvExecuted::TRecord& ev, const bool usedDLQ) {
auto reqInfoIt = ReceiveMessageRequests_.find(requestId);
Y_VERIFY(reqInfoIt != ReceiveMessageRequests_.end());
auto& reqInfo = reqInfoIt->second;
+ bool dlqExists = true;
+ bool success = false;
if (ev.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
- using NKikimr::NClient::TValue;
- const TValue value(TValue::Create(ev.GetExecutionEngineEvaluatedResponse()));
- const TValue list(value["result"]);
-
- if (const ui64 movedMessagesCount = value["movedMessagesCount"]) {
- ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount);
-
- const i64 newMessagesCount = value["newMessagesCount"];
- Y_VERIFY(newMessagesCount >= 0);
- auto& shardInfo = Shards_[0];
- shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount);
+ const NKikimr::NClient::TValue value = NKikimr::NClient::TValue::Create(ev.GetExecutionEngineEvaluatedResponse());
+ dlqExists = value["dlqExists"];
+ if (dlqExists) {
+ success = true;
+ OnFifoMessagesReadSuccess(value, reqInfo);
}
+ }
- reqInfo.Answer->Messages.resize(list.Size());
- for (size_t i = 0; i < list.Size(); ++i) {
- const TValue& data = list[i]["SourceDataFieldsRead"];
- const TValue& msg = list[i]["SourceMessageFieldsRead"];
- const ui64 receiveTimestamp = msg["FirstReceiveTimestamp"];
- auto& msgAnswer = reqInfo.Answer->Messages[i];
-
- msgAnswer.FirstReceiveTimestamp = (receiveTimestamp ? TInstant::MilliSeconds(receiveTimestamp) : reqInfo.LockSendTs);
- msgAnswer.ReceiveCount = ui32(msg["ReceiveCount"]) + 1; // since the query returns old receive count value
- msgAnswer.MessageId = data["MessageId"];
- msgAnswer.MessageDeduplicationId = data["DedupId"];
- msgAnswer.MessageGroupId = msg["GroupId"];
- msgAnswer.Data = data["Data"];
- msgAnswer.SentTimestamp = TInstant::MilliSeconds(ui64(msg["SentTimestamp"]));
- msgAnswer.SequenceNumber = msg["Offset"];
-
- msgAnswer.ReceiptHandle.SetMessageGroupId(TString(msg["GroupId"]));
- msgAnswer.ReceiptHandle.SetOffset(msgAnswer.SequenceNumber);
- msgAnswer.ReceiptHandle.SetReceiveRequestAttemptId(reqInfo.Event->Get()->ReceiveAttemptId);
- msgAnswer.ReceiptHandle.SetLockTimestamp(reqInfo.LockSendTs.MilliSeconds());
- msgAnswer.ReceiptHandle.SetShard(0);
-
- const TValue senderIdValue = data["SenderId"];
- if (senderIdValue.HaveValue()) {
- if (const TString senderId = TString(senderIdValue)) {
- msgAnswer.SenderId = senderId;
- }
- }
-
- const TValue attributesValue = data["Attributes"];
- if (attributesValue.HaveValue()) {
- msgAnswer.MessageAttributes = attributesValue;
- }
- }
- } else {
+ if (!success) {
const auto errStatus = NKikimr::NTxProxy::TResultStatus::EStatus(ev.GetStatus());
- if (usedDLQ && !NTxProxy::TResultStatus::IsSoftErrorWithoutSideEffects(errStatus)) {
+ if (usedDLQ && (!dlqExists || !NTxProxy::TResultStatus::IsSoftErrorWithoutSideEffects(errStatus))) {
// it's possible that DLQ was removed, hence it'd be wise to refresh corresponding info
DlqInfo_.Clear();
reqInfo.Answer->Failed = false;
@@ -735,13 +746,13 @@ void TQueueLeader::LoadStdMessages(TReceiveMessageBatchRequestProcessing& reqInf
}
}
-void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 offset, const TSqsEvents::TEvExecuted::TRecord& ev, const NKikimr::NClient::TValue* messageRecord, const bool ignoreMessageLoadingErrors) {
+void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 offset, bool success, const NKikimr::NClient::TValue* messageRecord, const bool ignoreMessageLoadingErrors) {
auto reqInfoIt = ReceiveMessageRequests_.find(requestId);
Y_VERIFY(reqInfoIt != ReceiveMessageRequests_.end());
auto& reqInfo = reqInfoIt->second;
--reqInfo.LoadAnswersLeft;
- if (ev.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
+ if (success) {
bool deleted = true;
bool deadlineChanged = true;
const bool exists = (*messageRecord)["Exists"];
@@ -820,6 +831,33 @@ void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 o
}
}
+void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch) {
+ const NKikimr::NClient::TValue list(value["result"]);
+ Y_VERIFY(list.Size() == batch->Size());
+
+ if (const ui64 movedMessagesCount = value["movedMessagesCount"]) {
+ ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount);
+
+ const i64 newMessagesCount = value["newMessagesCount"];
+ Y_VERIFY(newMessagesCount >= 0);
+ shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount);
+ }
+
+ THashMap<ui64, const TLoadBatchEntry*> offset2entry;
+ offset2entry.reserve(batch->Entries.size());
+ for (const TLoadBatchEntry& entry : batch->Entries) {
+ offset2entry.emplace(entry.Offset, &entry);
+ }
+
+ for (size_t i = 0; i < list.Size(); ++i) {
+ auto msg = list[i];
+ const ui64 offset = msg["Offset"];
+ const auto entry = offset2entry.find(offset);
+ Y_VERIFY(entry != offset2entry.end());
+ OnLoadStdMessageResult(entry->second->RequestId, offset, true, &msg, false);
+ }
+}
+
void TQueueLeader::OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, const bool usedDLQ, const TSqsEvents::TEvExecuted::TRecord& reply) {
auto& shardInfo = Shards_[shard];
auto& batchingState = shardInfo.LoadBatchingState;
@@ -827,37 +865,22 @@ void TQueueLeader::OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, cons
Y_VERIFY(batchIt != batchingState.BatchesExecuting.end());
auto batch = batchIt->second;
auto status = TEvTxUserProxy::TEvProposeTransactionStatus::EStatus(reply.GetStatus());
- bool ignoreMessageLoadingErrors = false;
- if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
- using NKikimr::NClient::TValue;
- const TValue value(TValue::Create(reply.GetExecutionEngineEvaluatedResponse()));
- const TValue list(value["result"]);
- Y_VERIFY(list.Size() == batch->Size());
-
- if (const ui64 movedMessagesCount = value["movedMessagesCount"]) {
- ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount);
- const i64 newMessagesCount = value["newMessagesCount"];
- Y_VERIFY(newMessagesCount >= 0);
- shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount);
- }
-
- THashMap<ui64, const TLoadBatchEntry*> offset2entry;
- offset2entry.reserve(batch->Entries.size());
- for (const TLoadBatchEntry& entry : batch->Entries) {
- offset2entry.emplace(entry.Offset, &entry);
+ bool dlqExists = true;
+ bool success = false;
+ if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
+ const NKikimr::NClient::TValue value = NKikimr::NClient::TValue::Create(reply.GetExecutionEngineEvaluatedResponse());
+ dlqExists = value["dlqExists"];
+ if (dlqExists) {
+ success = true;
+ OnLoadStdMessagesBatchSuccess(value, shardInfo, batch);
}
+ }
- for (size_t i = 0; i < list.Size(); ++i) {
- auto msg = list[i];
- const ui64 offset = msg["Offset"];
- const auto entry = offset2entry.find(offset);
- Y_VERIFY(entry != offset2entry.end());
- OnLoadStdMessageResult(entry->second->RequestId, offset, reply, &msg, ignoreMessageLoadingErrors);
- }
- } else {
+ if (!success) {
const auto errStatus = NKikimr::NTxProxy::TResultStatus::EStatus(reply.GetStatus());
- if (usedDLQ && !NTxProxy::TResultStatus::IsSoftErrorWithoutSideEffects(errStatus)) {
+ bool ignoreMessageLoadingErrors = false;
+ if (usedDLQ && (!dlqExists || !NTxProxy::TResultStatus::IsSoftErrorWithoutSideEffects(errStatus))) {
// it's possible that DLQ was removed, hence it'd be wise to refresh corresponding info
DlqInfo_.Clear();
ignoreMessageLoadingErrors = true;
@@ -868,9 +891,11 @@ void TQueueLeader::OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, cons
const TLoadBatchEntry& entry = batch->Entries[i];
if (!prevRequestId || *prevRequestId != entry.RequestId) {
prevRequestId = &entry.RequestId;
- RLOG_SQS_REQ_ERROR(entry.RequestId, "Batch transaction failed: " << reply << ". BatchId: " << batch->BatchId);
+ RLOG_SQS_REQ_ERROR(entry.RequestId,
+ "Batch transaction failed: " << reply << ". DlqExists=" << dlqExists << ". BatchId: " << batch->BatchId
+ );
}
- OnLoadStdMessageResult(entry.RequestId, entry.Offset, reply, nullptr, ignoreMessageLoadingErrors);
+ OnLoadStdMessageResult(entry.RequestId, entry.Offset, success, nullptr, ignoreMessageLoadingErrors);
}
}
batchingState.BatchesExecuting.erase(batchId);
diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h
index aace8e884e9..2b6e6a00571 100644
--- a/ydb/core/ymq/actor/queue_leader.h
+++ b/ydb/core/ymq/actor/queue_leader.h
@@ -29,6 +29,8 @@ class TQueueLeader : public TActorBootstrapped<TQueueLeader> {
struct TDeleteMessageBatchRequestProcessing;
struct TChangeMessageVisibilityBatchRequestProcessing;
struct TGetRuntimeQueueAttributesRequestProcessing;
+ struct TShardInfo;
+ struct TLoadBatch;
public:
TQueueLeader(TString userName, TString queueName, TString folderId, TString rootUrl, TIntrusivePtr<TQueueCounters> counters, TIntrusivePtr<TUserCounters> userCounters, const TActorId& schemeCache, const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser);
@@ -116,15 +118,17 @@ private:
void LockFifoGroup(TReceiveMessageBatchRequestProcessing& reqInfo);
void OnFifoGroupLocked(const TString& requestId, const TSqsEvents::TEvExecuted::TRecord& ev);
void ReadFifoMessages(TReceiveMessageBatchRequestProcessing& reqInfo);
+ void OnFifoMessagesReadSuccess(const NKikimr::NClient::TValue& value, TReceiveMessageBatchRequestProcessing& reqInfo);
void OnFifoMessagesRead(const TString& requestId, const TSqsEvents::TEvExecuted::TRecord& ev, bool usedDLQ);
void GetMessagesFromInfly(TReceiveMessageBatchRequestProcessing& reqInfo);
void LoadStdMessages(TReceiveMessageBatchRequestProcessing& reqInfo);
- void OnLoadStdMessageResult(const TString& requestId, ui64 offset, const TSqsEvents::TEvExecuted::TRecord& ev, const NKikimr::NClient::TValue* messageRecord, bool ignoreMessageLoadingErrors);
+ void OnLoadStdMessageResult(const TString& requestId, ui64 offset, bool success, const NKikimr::NClient::TValue* messageRecord, bool ignoreMessageLoadingErrors);
void TryReceiveAnotherShard(TReceiveMessageBatchRequestProcessing& reqInfo);
void WaitAddMessagesToInflyOrTryAnotherShard(TReceiveMessageBatchRequestProcessing& reqInfo);
void Reply(TReceiveMessageBatchRequestProcessing& reqInfo);
// batching
+ void OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch);
void OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, const bool usedDLQ, const TSqsEvents::TEvExecuted::TRecord& reply);
// delete
diff --git a/ydb/core/ymq/queues/fifo/queries.cpp b/ydb/core/ymq/queues/fifo/queries.cpp
index 0d5c5239e2c..ecc24b93eb2 100644
--- a/ydb/core/ymq/queues/fifo/queries.cpp
+++ b/ydb/core/ymq/queues/fifo/queries.cpp
@@ -693,6 +693,7 @@ const char* const ReadMessageQuery = R"__(
)))))
(return (Extend
+ (AsList (SetResult 'dlqExists (Bool 'true)))
(AsList (SetResult 'result result))
(AsList (SetResult 'movedMessagesCount (Uint64 '0)))
@@ -826,6 +827,7 @@ const char* const ReadOrRedriveMessageQuery = R"__(
'WriteOffset
'LastModifiedTimestamp))
(let dlqStateRead (SelectRow dlqStateTable dlqStateRow dlqStateSelect))
+ (let dlqExists (Exists dlqStateRead))
(let dlqSentTimestamp (Max now (Member dlqStateRead 'LastModifiedTimestamp)))
(let dlqStartOffset (Add (Member dlqStateRead 'WriteOffset) (Uint64 '1)))
@@ -860,14 +862,15 @@ const char* const ReadOrRedriveMessageQuery = R"__(
'('MessageCount newSourceMsgCount)))
(return (Extend
+ (AsList (SetResult 'dlqExists dlqExists))
(AsList (SetResult 'result messagesToReturnAsStruct))
(AsList (SetResult 'movedMessagesCount (Length messagesToMoveAsStruct)))
(AsList (SetResult 'newMessagesCount newSourceMsgCount))
- (ListIf (HasItems messagesToMoveAsStruct) (UpdateRow dlqStateTable dlqStateRow dlqStateUpdate))
- (ListIf (HasItems messagesToMoveAsStruct) (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate))
+ (ListIf (And (HasItems messagesToMoveAsStruct) dlqExists) (UpdateRow dlqStateTable dlqStateRow dlqStateUpdate))
+ (ListIf (And (HasItems messagesToMoveAsStruct) dlqExists) (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate))
# copy messages to dlq
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let dlqDataRow '(
)__" DLQ_ID_KEYS_PARAM R"__(
'('RandomId randomId)
@@ -879,8 +882,9 @@ const char* const ReadOrRedriveMessageQuery = R"__(
'('SenderId (Member (Nth item '1) 'SenderId))
'('MessageId (Member (Nth item '1) 'MessageId))))
(return (UpdateRow dlqDataTable dlqDataRow dlqDataUpdate))))))
+ (AsList (Void)))
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let dlqMsgRow '(
)__" DLQ_ID_KEYS_PARAM R"__(
'('Offset (Nth item '0))))
@@ -893,8 +897,9 @@ const char* const ReadOrRedriveMessageQuery = R"__(
'('FirstReceiveTimestamp (Uint64 '0))
'('SentTimestamp dlqSentTimestamp)))
(return (UpdateRow dlqMsgTable dlqMsgRow dlqMessageUpdate))))))
+ (AsList (Void)))
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let dlqSentTsRow '(
)__" DLQ_ID_KEYS_PARAM R"__(
'('SentTimestamp dlqSentTimestamp)
@@ -906,8 +911,9 @@ const char* const ReadOrRedriveMessageQuery = R"__(
'('DelayDeadline delayDeadline)
'('GroupId (Member (Nth item '1) 'GroupId))))
(return (UpdateRow dlqSentTsIdx dlqSentTsRow dlqSentTsUpdate))))))
+ (AsList (Void)))
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let dlqGroupRow '(
)__" DLQ_ID_KEYS_PARAM R"__(
'('GroupId (Member (Nth item '1) 'GroupId))))
@@ -930,8 +936,9 @@ const char* const ReadOrRedriveMessageQuery = R"__(
(UpdateRow dlqGroupTable dlqGroupRow dlqGroupUpdate)
)
)))))
+ (AsList (Void)))
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let dlqTail (Member (Nth item '1) 'DlqTail))
(let dlqPrevMessageRow '(
)__" DLQ_ID_KEYS_PARAM R"__(
@@ -944,29 +951,33 @@ const char* const ReadOrRedriveMessageQuery = R"__(
(UpdateRow dlqMsgTable dlqPrevMessageRow dlqPrevMessageUpdate)
(Void))
)))))
+ (AsList (Void)))
# remove dead letters' content from source queue
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let row '(
)__" QUEUE_ID_KEYS_PARAM R"__(
'('RandomId (Member (Nth item '1) 'SourceRandomId))
'('Offset (Member (Nth item '1) 'SourceOffset))))
(return (EraseRow sourceDataTable row))))))
+ (AsList (Void)))
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let row '(
)__" QUEUE_ID_KEYS_PARAM R"__(
'('Offset (Member (Nth item '1) 'SourceOffset))))
(return (EraseRow sourceMsgTable row))))))
+ (AsList (Void)))
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let row '(
)__" QUEUE_ID_KEYS_PARAM R"__(
'('SentTimestamp (Member (Nth item '1) 'SourceSentTimestamp))
'('Offset (Member (Nth item '1) 'SourceOffset))))
(return (EraseRow sourceSentTsIdx row))))))
+ (AsList (Void)))
- (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
+ (If dlqExists (Map dlqMessagesInfoWithProperIndexesSorted (lambda '(item) (block '(
(let row '(
)__" QUEUE_ID_KEYS_PARAM R"__(
'('GroupId (Member (Nth item '1) 'GroupId))))
@@ -975,14 +986,14 @@ const char* const ReadOrRedriveMessageQuery = R"__(
'('Head (Member (Nth item '1) 'SourceNextOffset))
'('LockTimestamp (Uint64 '0))
'('VisibilityDeadline (Uint64 '0))))
-
(return
(If (Coalesce (Equal (Member (Nth item '1) 'SourceNextOffset) (Uint64 '0)) (Bool 'false))
(EraseRow sourceGroupTable row)
(UpdateRow sourceGroupTable row update)))))))
+ (AsList (Void)))
# just return ordinary messages
- (Map messagesToReturnAsStruct (lambda '(item) (block '(
+ (If dlqExists (Map messagesToReturnAsStruct (lambda '(item) (block '(
(let message (Member item 'SourceMessageFieldsRead))
(let row '(
)__" QUEUE_ID_KEYS_PARAM R"__(
@@ -992,7 +1003,9 @@ const char* const ReadOrRedriveMessageQuery = R"__(
(let update '(
'('FirstReceiveTimestamp receiveTimestamp)
'('ReceiveCount (Add (Member message 'ReceiveCount) (Uint32 '1)))))
- (return (UpdateRow sourceMsgTable row update))))))))
+ (return (UpdateRow sourceMsgTable row update))))))
+ (AsList (Void)))
+ ))
)
)__";
diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp
index 1df5c5a87e7..9d7339fa3b9 100644
--- a/ydb/core/ymq/queues/std/queries.cpp
+++ b/ydb/core/ymq/queues/std/queries.cpp
@@ -686,6 +686,7 @@ const char* const LoadMessageQuery = R"__(
(return (Coalesce (Member item 'Valid) (Bool 'false))))))))
(return (Extend
+ (AsList (SetResult 'dlqExists (Bool 'true)))
(AsList (SetResult 'result records))
(AsList (SetResult 'movedMessagesCount (Uint64 '0)))
@@ -715,12 +716,12 @@ const char* const LoadOrRedriveMessageQuery = R"__(
'('VisibilityDeadline (DataType 'Uint64))))))
(let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
- (let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
(let shard (Parameter 'SHARD (DataType ')__" SHARD_TYPE_PARAM R"__()))
(let queueIdNumberAndShardHash (Parameter 'QUEUE_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
(let dlqIdNumber (Parameter 'DLQ_ID_NUMBER (DataType 'Uint64)))
- (let dlqIdNumberAndShardHash (Parameter 'DLQ_ID_NUMBER_HASH (DataType 'Uint64)))
+ (let dlqIdNumberHash (Parameter 'DLQ_ID_NUMBER_HASH (DataType 'Uint64)))
(let dlqShard (Parameter 'DLQ_SHARD (DataType ')__" DLQ_SHARD_TYPE_PARAM R"__()))
(let dlqIdNumberAndShardHash (Parameter 'DLQ_ID_NUMBER_AND_SHARD_HASH (DataType 'Uint64)))
@@ -825,6 +826,7 @@ const char* const LoadOrRedriveMessageQuery = R"__(
'WriteOffset
'LastModifiedTimestamp))
(let deadLetterStateRead (SelectRow deadLetterStateTable deadLetterStateRow deadLetterStateSelect))
+ (let dlqExists (Exists deadLetterStateRead))
(let newDlqMessagesCount (Add (Member deadLetterStateRead 'MessageCount) (Length messagesToMove)))
(let newDlqWriteOffset (Add (Member deadLetterStateRead 'WriteOffset) deadLettersCount))
@@ -844,13 +846,14 @@ const char* const LoadOrRedriveMessageQuery = R"__(
'('InflyCount (Sub (Member sourceStateRead 'InflyCount) (Length messagesToMove)))))
(return (Extend
+ (AsList (SetResult 'dlqExists dlqExists))
(AsList (SetResult 'result records))
(AsList (SetResult 'movedMessagesCount (Length messagesToMove)))
(AsList (SetResult 'newMessagesCount newSourceMessagesCount))
- (AsList (UpdateRow deadLetterStateTable deadLetterStateRow deadLetterStateUpdate))
- (AsList (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate))
+ (ListIf dlqExists (UpdateRow deadLetterStateTable deadLetterStateRow deadLetterStateUpdate))
+ (ListIf dlqExists (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate))
- (Map messagesToUpdate (lambda '(item) (block '(
+ (If dlqExists (Map messagesToUpdate (lambda '(item) (block '(
(let row '(
)__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('Offset (Member item 'Offset))))
@@ -861,8 +864,9 @@ const char* const LoadOrRedriveMessageQuery = R"__(
'('ReceiveCount (Member item 'ReceiveCount))
'('VisibilityDeadline (Member item 'VisibilityDeadline))))
(return (UpdateRow sourceInflyTable row update))))))
+ (AsList (Void)))
- (Map messagesToMove (lambda '(item) (block '(
+ (If dlqExists (Map messagesToMove (lambda '(item) (block '(
(let msgRow '(
)__" DLQ_ID_AND_SHARD_KEYS_PARAM R"__(
'('Offset (Add dlqStartOffset (Member item 'DlqIndex)))))
@@ -872,8 +876,9 @@ const char* const LoadOrRedriveMessageQuery = R"__(
'('SentTimestamp dlqMostRecentTimestamp)
'('DelayDeadline delayDeadline)))
(return (UpdateRow deadLetterMessagesTable msgRow messageUpdate))))))
+ (AsList (Void)))
- (Map messagesToMove (lambda '(item) (block '(
+ (If dlqExists (Map messagesToMove (lambda '(item) (block '(
(let sentTsRow '(
)__" DLQ_ID_AND_SHARD_KEYS_PARAM R"__(
'('SentTimestamp dlqMostRecentTimestamp)
@@ -883,8 +888,9 @@ const char* const LoadOrRedriveMessageQuery = R"__(
'('RandomId readId)
'('DelayDeadline delayDeadline)))
(return (UpdateRow deadLetterSentTsIdxTable sentTsRow sentTsUpdate))))))
+ (AsList (Void)))
- (Map messagesToMove (lambda '(item) (block '(
+ (If dlqExists (Map messagesToMove (lambda '(item) (block '(
(let dataRow '(
)__" DLQ_ID_AND_SHARD_KEYS_PARAM R"__(
'('RandomId readId)
@@ -896,27 +902,31 @@ const char* const LoadOrRedriveMessageQuery = R"__(
'('SenderId (Member item 'SenderId))
'('MessageId (Member item 'MessageId))))
(return (UpdateRow deadLetterMessageDataTable dataRow dataUpdate))))))
+ (AsList (Void)))
- (Map messagesToMove (lambda '(item) (block '(
+ (If dlqExists (Map messagesToMove (lambda '(item) (block '(
(let inflyRow '(
)__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('Offset (Member item 'Offset))))
(return (EraseRow sourceInflyTable inflyRow))))))
+ (AsList (Void)))
- (Map messagesToMove (lambda '(item) (block '(
+ (If dlqExists (Map messagesToMove (lambda '(item) (block '(
(let dataRow '(
)__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('RandomId (Member item 'RandomId))
'('Offset (Member item 'Offset))))
(return (EraseRow sourceMessageDataTable dataRow))))))
+ (AsList (Void)))
- (Map messagesToMove (lambda '(item) (block '(
+ (If dlqExists (Map messagesToMove (lambda '(item) (block '(
(let sentTsRow '(
)__" QUEUE_ID_AND_SHARD_KEYS_PARAM R"__(
'('SentTimestamp (Member item 'SentTimestamp))
'('Offset (Member item 'Offset))))
(return (EraseRow sourceSentTsIdxTable sentTsRow))))))
+ (AsList (Void)))
))
)
)__";