aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2023-04-18 17:55:37 +0300
committeralexbogo <alexbogo@ydb.tech>2023-04-18 17:55:37 +0300
commita2c46b2d7a9bd81e2240086438724e37d3d936db (patch)
treead7049ae435f88a4dc2e2ea418f2698f007fe67d
parente318f0163f16e2a3e5a139c5fd8170fa7dd4af01 (diff)
downloadydb-a2c46b2d7a9bd81e2240086438724e37d3d936db.tar.gz
[ymq] detect that queue deleted on GetQueueAttributes
Возможен рейс, когда лидер очереди еще жив, но очередь удалена. Необходимо поддержать в ответе mimiKQL информацию об этом и в обработчике поддерживаем ответ пользователю об отсутствии очереди. В рамках этого ревью проверка что очередь удален производиться для запросов на получение атрибутов очереди.
-rw-r--r--ydb/core/ymq/actor/get_queue_attributes.cpp78
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp98
-rw-r--r--ydb/core/ymq/actor/queue_leader.h2
-rw-r--r--ydb/core/ymq/queues/fifo/queries.cpp5
-rw-r--r--ydb/core/ymq/queues/std/queries.cpp5
-rw-r--r--ydb/tests/functional/sqs/common/test_queues_managing.py23
6 files changed, 128 insertions, 83 deletions
diff --git a/ydb/core/ymq/actor/get_queue_attributes.cpp b/ydb/core/ymq/actor/get_queue_attributes.cpp
index c9f7ec3084e..5dacb354c01 100644
--- a/ydb/core/ymq/actor/get_queue_attributes.cpp
+++ b/ydb/core/ymq/actor/get_queue_attributes.cpp
@@ -182,51 +182,55 @@ private:
const auto& record = ev->Get()->Record;
const ui32 status = record.GetStatus();
auto* result = Response_.MutableGetQueueAttributes();
+ bool queueExists = true;
if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
const TValue val(TValue::Create(record.GetExecutionEngineEvaluatedResponse()));
- const TValue& attrs(val["attrs"]);
+ queueExists = val["queueExists"];
+ if (queueExists) {
+ const TValue& attrs(val["attrs"]);
- if (HasAttributeName("ContentBasedDeduplication")) {
- result->SetContentBasedDeduplication(bool(attrs["ContentBasedDeduplication"]));
- }
- if (HasAttributeName("DelaySeconds")) {
- result->SetDelaySeconds(TDuration::MilliSeconds(ui64(attrs["DelaySeconds"])).Seconds());
- }
- if (HasAttributeName("FifoQueue")) {
- result->SetFifoQueue(bool(attrs["FifoQueue"]));
- }
- if (HasAttributeName("MaximumMessageSize")) {
- result->SetMaximumMessageSize(ui64(attrs["MaximumMessageSize"]));
- }
- if (HasAttributeName("MessageRetentionPeriod")) {
- result->SetMessageRetentionPeriod(TDuration::MilliSeconds(ui64(attrs["MessageRetentionPeriod"])).Seconds());
- }
- if (HasAttributeName("ReceiveMessageWaitTimeSeconds")) {
- result->SetReceiveMessageWaitTimeSeconds(TDuration::MilliSeconds(ui64(attrs["ReceiveMessageWaitTime"])).Seconds());
- }
- if (HasAttributeName("VisibilityTimeout")) {
- result->SetVisibilityTimeout(TDuration::MilliSeconds(ui64(attrs["VisibilityTimeout"])).Seconds());
- }
- if (HasAttributeName("RedrivePolicy")) {
- const TValue& dlqArn(attrs["DlqArn"]);
- if (dlqArn.HaveValue() && !TString(dlqArn).empty()) {
- // the attributes can't be set separately, so we check only one
- TRedrivePolicy redrivePolicy;
- redrivePolicy.TargetArn = TString(dlqArn);
- redrivePolicy.MaxReceiveCount = ui64(attrs["MaxReceiveCount"]);
- result->SetRedrivePolicy(redrivePolicy.ToJson());
+ if (HasAttributeName("ContentBasedDeduplication")) {
+ result->SetContentBasedDeduplication(bool(attrs["ContentBasedDeduplication"]));
+ }
+ if (HasAttributeName("DelaySeconds")) {
+ result->SetDelaySeconds(TDuration::MilliSeconds(ui64(attrs["DelaySeconds"])).Seconds());
+ }
+ if (HasAttributeName("FifoQueue")) {
+ result->SetFifoQueue(bool(attrs["FifoQueue"]));
+ }
+ if (HasAttributeName("MaximumMessageSize")) {
+ result->SetMaximumMessageSize(ui64(attrs["MaximumMessageSize"]));
}
+ if (HasAttributeName("MessageRetentionPeriod")) {
+ result->SetMessageRetentionPeriod(TDuration::MilliSeconds(ui64(attrs["MessageRetentionPeriod"])).Seconds());
+ }
+ if (HasAttributeName("ReceiveMessageWaitTimeSeconds")) {
+ result->SetReceiveMessageWaitTimeSeconds(TDuration::MilliSeconds(ui64(attrs["ReceiveMessageWaitTime"])).Seconds());
+ }
+ if (HasAttributeName("VisibilityTimeout")) {
+ result->SetVisibilityTimeout(TDuration::MilliSeconds(ui64(attrs["VisibilityTimeout"])).Seconds());
+ }
+ if (HasAttributeName("RedrivePolicy")) {
+ const TValue& dlqArn(attrs["DlqArn"]);
+ if (dlqArn.HaveValue() && !TString(dlqArn).empty()) {
+ // the attributes can't be set separately, so we check only one
+ TRedrivePolicy redrivePolicy;
+ redrivePolicy.TargetArn = TString(dlqArn);
+ redrivePolicy.MaxReceiveCount = ui64(attrs["MaxReceiveCount"]);
+ result->SetRedrivePolicy(redrivePolicy.ToJson());
+ }
+ }
+
+ --WaitCount_;
+ ReplyIfReady();
+ return;
}
- } else {
- RLOG_SQS_ERROR("Get queue attributes query failed");
- MakeError(result, NErrors::INTERNAL_FAILURE);
- SendReplyAndDie();
- return;
}
- --WaitCount_;
- ReplyIfReady();
+ RLOG_SQS_ERROR("Get queue attributes query failed, queue exists: " << queueExists << ", answer: " << record);
+ MakeError(result, queueExists ? NErrors::INTERNAL_FAILURE : NErrors::NON_EXISTENT_QUEUE);
+ SendReplyAndDie();
}
void HandleRuntimeAttributes(TSqsEvents::TEvGetRuntimeQueueAttributesResponse::TPtr& ev) {
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index 32472412e88..199119264d1 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -1365,15 +1365,21 @@ void TQueueLeader::AnswerGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr&
Send(req->Sender, std::move(resp));
}
-void TQueueLeader::AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev) {
+void TQueueLeader::AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev, bool queueRemoved) {
auto answer = MakeHolder<TSqsEvents::TEvConfiguration>();
answer->RootUrl = RootUrl_;
answer->SqsCoreCounters = Counters_->RootCounters.SqsCounters;
answer->QueueCounters = Counters_;
answer->UserCounters = UserCounters_;
- answer->Fail = true;
+
answer->SchemeCache = SchemeCache_;
answer->QuoterResources = QuoterResources_;
+ if (queueRemoved) {
+ answer->UserExists = true;
+ answer->QueueExists = false;
+ } else {
+ answer->Fail = true;
+ }
Send(ev->Sender, answer.Release());
}
@@ -1512,54 +1518,60 @@ void TQueueLeader::AskQueueAttributes() {
void TQueueLeader::OnQueueAttributes(const TSqsEvents::TEvExecuted::TRecord& ev) {
const ui32 status = ev.GetStatus();
+ bool queueExists = true;
if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
using NKikimr::NClient::TValue;
const TValue val(TValue::Create(ev.GetExecutionEngineEvaluatedResponse()));
- const TValue& attrs(val["attrs"]);
-
- TSqsEvents::TQueueAttributes attributes;
- attributes.ContentBasedDeduplication = attrs["ContentBasedDeduplication"];
- attributes.DelaySeconds = TDuration::MilliSeconds(attrs["DelaySeconds"]);
- attributes.FifoQueue = attrs["FifoQueue"];
- attributes.MaximumMessageSize = attrs["MaximumMessageSize"];
- attributes.MessageRetentionPeriod = TDuration::MilliSeconds(attrs["MessageRetentionPeriod"]);
- attributes.ReceiveMessageWaitTime = TDuration::MilliSeconds(attrs["ReceiveMessageWaitTime"]);
- attributes.VisibilityTimeout = TDuration::MilliSeconds(attrs["VisibilityTimeout"]);
-
- const TValue showDetailedCountersDeadline = attrs["ShowDetailedCountersDeadline"];
- if (showDetailedCountersDeadline.HaveValue()) {
- const ui64 ms = showDetailedCountersDeadline;
- Counters_->ShowDetailedCounters(TInstant::MilliSeconds(ms));
- }
-
- // update dead letter queue info
- const auto& dlqNameVal(attrs["DlqName"]);
- const auto& maxReceiveCountVal(attrs["MaxReceiveCount"]);
- if (dlqNameVal.HaveValue() && maxReceiveCountVal.HaveValue()) {
- TTargetDlqInfo info;
- info.DlqName = TString(dlqNameVal);
- info.MaxReceiveCount = ui64(maxReceiveCountVal);
- if (info.DlqName && info.MaxReceiveCount) {
- DlqInfo_ = info;
- // now we have to discover queue id and version
- Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvGetQueueId("DLQ", UserName_, info.DlqName, FolderId_));
- } else {
- DlqInfo_.Clear();
+
+ queueExists = val["queueExists"];
+ if (queueExists) {
+ const TValue& attrs(val["attrs"]);
+
+ TSqsEvents::TQueueAttributes attributes;
+ attributes.ContentBasedDeduplication = attrs["ContentBasedDeduplication"];
+ attributes.DelaySeconds = TDuration::MilliSeconds(attrs["DelaySeconds"]);
+ attributes.FifoQueue = attrs["FifoQueue"];
+ attributes.MaximumMessageSize = attrs["MaximumMessageSize"];
+ attributes.MessageRetentionPeriod = TDuration::MilliSeconds(attrs["MessageRetentionPeriod"]);
+ attributes.ReceiveMessageWaitTime = TDuration::MilliSeconds(attrs["ReceiveMessageWaitTime"]);
+ attributes.VisibilityTimeout = TDuration::MilliSeconds(attrs["VisibilityTimeout"]);
+
+ const TValue showDetailedCountersDeadline = attrs["ShowDetailedCountersDeadline"];
+ if (showDetailedCountersDeadline.HaveValue()) {
+ const ui64 ms = showDetailedCountersDeadline;
+ Counters_->ShowDetailedCounters(TInstant::MilliSeconds(ms));
}
- }
- QueueAttributes_ = attributes;
- AttributesUpdateTime_ = TActivationContext::Now();
- for (auto& req : GetConfigurationRequests_) {
- AnswerGetConfiguration(req);
- }
- GetConfigurationRequests_.clear();
- } else {
- for (auto& req : GetConfigurationRequests_) {
- AnswerFailed(req);
+ // update dead letter queue info
+ const auto& dlqNameVal(attrs["DlqName"]);
+ const auto& maxReceiveCountVal(attrs["MaxReceiveCount"]);
+ if (dlqNameVal.HaveValue() && maxReceiveCountVal.HaveValue()) {
+ TTargetDlqInfo info;
+ info.DlqName = TString(dlqNameVal);
+ info.MaxReceiveCount = ui64(maxReceiveCountVal);
+ if (info.DlqName && info.MaxReceiveCount) {
+ DlqInfo_ = info;
+ // now we have to discover queue id and version
+ Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvGetQueueId("DLQ", UserName_, info.DlqName, FolderId_));
+ } else {
+ DlqInfo_.Clear();
+ }
+ }
+
+ QueueAttributes_ = attributes;
+ AttributesUpdateTime_ = TActivationContext::Now();
+ for (auto& req : GetConfigurationRequests_) {
+ AnswerGetConfiguration(req);
+ }
+ GetConfigurationRequests_.clear();
+ return;
}
- GetConfigurationRequests_.clear();
}
+
+ for (auto& req : GetConfigurationRequests_) {
+ AnswerFailed(req, !queueExists);
+ }
+ GetConfigurationRequests_.clear();
}
void TQueueLeader::HandleQueueId(TSqsEvents::TEvQueueId::TPtr& ev) {
diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h
index ef68e5c0d89..6178288ddbf 100644
--- a/ydb/core/ymq/actor/queue_leader.h
+++ b/ydb/core/ymq/actor/queue_leader.h
@@ -100,7 +100,7 @@ private:
void RemoveCachedRequest(size_t shard, size_t idx);
void CreateBackgroundActors();
void AnswerGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr& req);
- void AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev);
+ void AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev, bool queueRemoved = false);
void AskQueueAttributes();
void OnQueueAttributes(const TSqsEvents::TEvExecuted::TRecord& ev);
void MarkInflyReloading(ui64 shard, i64 invalidatedCount, const TString& invalidationReason);
diff --git a/ydb/core/ymq/queues/fifo/queries.cpp b/ydb/core/ymq/queues/fifo/queries.cpp
index 424fcf5c856..60c080198ca 100644
--- a/ydb/core/ymq/queues/fifo/queries.cpp
+++ b/ydb/core/ymq/queues/fifo/queries.cpp
@@ -533,8 +533,11 @@ const char* const InternalGetQueueAttributesQuery = R"__(
'VisibilityTimeout
'ShowDetailedCountersDeadline))
+ (let attrsRead (SelectRow attrsTable attrsRow attrsSelect))
+
(return (AsList
- (SetResult 'attrs (SelectRow attrsTable attrsRow attrsSelect))))
+ (SetResult 'queueExists (Exists attrsRead))
+ (SetResult 'attrs attrsRead)))
)
)__";
diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp
index 8b6d93f951e..9ef609c963e 100644
--- a/ydb/core/ymq/queues/std/queries.cpp
+++ b/ydb/core/ymq/queues/std/queries.cpp
@@ -556,8 +556,11 @@ const char* const InternalGetQueueAttributesQuery = R"__(
'VisibilityTimeout
'ShowDetailedCountersDeadline))
+ (let attrsRead (SelectRow attrsTable attrsRow attrsSelect))
+
(return (AsList
- (SetResult 'attrs (SelectRow attrsTable attrsRow attrsSelect))))
+ (SetResult 'queueExists (Exists attrsRead))
+ (SetResult 'attrs attrsRead)))
)
)__";
diff --git a/ydb/tests/functional/sqs/common/test_queues_managing.py b/ydb/tests/functional/sqs/common/test_queues_managing.py
index af8510b459b..aa87459ba77 100644
--- a/ydb/tests/functional/sqs/common/test_queues_managing.py
+++ b/ydb/tests/functional/sqs/common/test_queues_managing.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
+import logging
import pytest
from hamcrest import assert_that, equal_to, greater_than, not_none, none, has_item, has_items, raises, empty, instance_of
@@ -233,6 +234,28 @@ class QueuesManagingTest(KikimrSqsTestBase):
@pytest.mark.parametrize(**IS_FIFO_PARAMS)
@pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
+ def test_request_to_deleted_queue(self, is_fifo, tables_format):
+ self._init_with_params(is_fifo, tables_format)
+
+ created_queue_url = self._create_queue_and_assert(self.queue_name, is_fifo=is_fifo)
+ self._sqs_api.get_queue_attributes(created_queue_url)
+ delete_result = self._sqs_api.delete_queue(created_queue_url)
+ assert_that(
+ delete_result, not_none()
+ )
+
+ while True:
+ try:
+ self._sqs_api.get_queue_attributes(created_queue_url)
+ except Exception as e:
+ logging.info(f'error during getting attributes after deletion: {e}')
+ if 'The specified queue doesn\'t exist.' in str(e):
+ break
+ else:
+ assert tables_format != 1 # for tables_format = 0 : can be internal error (failed to resolve table)
+
+ @pytest.mark.parametrize(**IS_FIFO_PARAMS)
+ @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
def test_purge_queue(self, is_fifo, tables_format):
self._init_with_params(is_fifo, tables_format)