diff options
author | alexbogo <alexbogo@ydb.tech> | 2023-04-18 17:55:37 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2023-04-18 17:55:37 +0300 |
commit | a2c46b2d7a9bd81e2240086438724e37d3d936db (patch) | |
tree | ad7049ae435f88a4dc2e2ea418f2698f007fe67d | |
parent | e318f0163f16e2a3e5a139c5fd8170fa7dd4af01 (diff) | |
download | ydb-a2c46b2d7a9bd81e2240086438724e37d3d936db.tar.gz |
[ymq] detect that queue deleted on GetQueueAttributes
Возможен рейс, когда лидер очереди еще жив, но очередь удалена.
Необходимо поддержать в ответе mimiKQL информацию об этом и в обработчике поддерживаем ответ пользователю об отсутствии очереди.
В рамках этого ревью проверка что очередь удален производиться для запросов на получение атрибутов очереди.
-rw-r--r-- | ydb/core/ymq/actor/get_queue_attributes.cpp | 78 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 98 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.h | 2 | ||||
-rw-r--r-- | ydb/core/ymq/queues/fifo/queries.cpp | 5 | ||||
-rw-r--r-- | ydb/core/ymq/queues/std/queries.cpp | 5 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/common/test_queues_managing.py | 23 |
6 files changed, 128 insertions, 83 deletions
diff --git a/ydb/core/ymq/actor/get_queue_attributes.cpp b/ydb/core/ymq/actor/get_queue_attributes.cpp index c9f7ec3084e..5dacb354c01 100644 --- a/ydb/core/ymq/actor/get_queue_attributes.cpp +++ b/ydb/core/ymq/actor/get_queue_attributes.cpp @@ -182,51 +182,55 @@ private: const auto& record = ev->Get()->Record; const ui32 status = record.GetStatus(); auto* result = Response_.MutableGetQueueAttributes(); + bool queueExists = true; if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { const TValue val(TValue::Create(record.GetExecutionEngineEvaluatedResponse())); - const TValue& attrs(val["attrs"]); + queueExists = val["queueExists"]; + if (queueExists) { + const TValue& attrs(val["attrs"]); - if (HasAttributeName("ContentBasedDeduplication")) { - result->SetContentBasedDeduplication(bool(attrs["ContentBasedDeduplication"])); - } - if (HasAttributeName("DelaySeconds")) { - result->SetDelaySeconds(TDuration::MilliSeconds(ui64(attrs["DelaySeconds"])).Seconds()); - } - if (HasAttributeName("FifoQueue")) { - result->SetFifoQueue(bool(attrs["FifoQueue"])); - } - if (HasAttributeName("MaximumMessageSize")) { - result->SetMaximumMessageSize(ui64(attrs["MaximumMessageSize"])); - } - if (HasAttributeName("MessageRetentionPeriod")) { - result->SetMessageRetentionPeriod(TDuration::MilliSeconds(ui64(attrs["MessageRetentionPeriod"])).Seconds()); - } - if (HasAttributeName("ReceiveMessageWaitTimeSeconds")) { - result->SetReceiveMessageWaitTimeSeconds(TDuration::MilliSeconds(ui64(attrs["ReceiveMessageWaitTime"])).Seconds()); - } - if (HasAttributeName("VisibilityTimeout")) { - result->SetVisibilityTimeout(TDuration::MilliSeconds(ui64(attrs["VisibilityTimeout"])).Seconds()); - } - if (HasAttributeName("RedrivePolicy")) { - const TValue& dlqArn(attrs["DlqArn"]); - if (dlqArn.HaveValue() && !TString(dlqArn).empty()) { - // the attributes can't be set separately, so we check only one - TRedrivePolicy redrivePolicy; - redrivePolicy.TargetArn = TString(dlqArn); - redrivePolicy.MaxReceiveCount = ui64(attrs["MaxReceiveCount"]); - result->SetRedrivePolicy(redrivePolicy.ToJson()); + if (HasAttributeName("ContentBasedDeduplication")) { + result->SetContentBasedDeduplication(bool(attrs["ContentBasedDeduplication"])); + } + if (HasAttributeName("DelaySeconds")) { + result->SetDelaySeconds(TDuration::MilliSeconds(ui64(attrs["DelaySeconds"])).Seconds()); + } + if (HasAttributeName("FifoQueue")) { + result->SetFifoQueue(bool(attrs["FifoQueue"])); + } + if (HasAttributeName("MaximumMessageSize")) { + result->SetMaximumMessageSize(ui64(attrs["MaximumMessageSize"])); } + if (HasAttributeName("MessageRetentionPeriod")) { + result->SetMessageRetentionPeriod(TDuration::MilliSeconds(ui64(attrs["MessageRetentionPeriod"])).Seconds()); + } + if (HasAttributeName("ReceiveMessageWaitTimeSeconds")) { + result->SetReceiveMessageWaitTimeSeconds(TDuration::MilliSeconds(ui64(attrs["ReceiveMessageWaitTime"])).Seconds()); + } + if (HasAttributeName("VisibilityTimeout")) { + result->SetVisibilityTimeout(TDuration::MilliSeconds(ui64(attrs["VisibilityTimeout"])).Seconds()); + } + if (HasAttributeName("RedrivePolicy")) { + const TValue& dlqArn(attrs["DlqArn"]); + if (dlqArn.HaveValue() && !TString(dlqArn).empty()) { + // the attributes can't be set separately, so we check only one + TRedrivePolicy redrivePolicy; + redrivePolicy.TargetArn = TString(dlqArn); + redrivePolicy.MaxReceiveCount = ui64(attrs["MaxReceiveCount"]); + result->SetRedrivePolicy(redrivePolicy.ToJson()); + } + } + + --WaitCount_; + ReplyIfReady(); + return; } - } else { - RLOG_SQS_ERROR("Get queue attributes query failed"); - MakeError(result, NErrors::INTERNAL_FAILURE); - SendReplyAndDie(); - return; } - --WaitCount_; - ReplyIfReady(); + RLOG_SQS_ERROR("Get queue attributes query failed, queue exists: " << queueExists << ", answer: " << record); + MakeError(result, queueExists ? NErrors::INTERNAL_FAILURE : NErrors::NON_EXISTENT_QUEUE); + SendReplyAndDie(); } void HandleRuntimeAttributes(TSqsEvents::TEvGetRuntimeQueueAttributesResponse::TPtr& ev) { diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index 32472412e88..199119264d1 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -1365,15 +1365,21 @@ void TQueueLeader::AnswerGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr& Send(req->Sender, std::move(resp)); } -void TQueueLeader::AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev) { +void TQueueLeader::AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev, bool queueRemoved) { auto answer = MakeHolder<TSqsEvents::TEvConfiguration>(); answer->RootUrl = RootUrl_; answer->SqsCoreCounters = Counters_->RootCounters.SqsCounters; answer->QueueCounters = Counters_; answer->UserCounters = UserCounters_; - answer->Fail = true; + answer->SchemeCache = SchemeCache_; answer->QuoterResources = QuoterResources_; + if (queueRemoved) { + answer->UserExists = true; + answer->QueueExists = false; + } else { + answer->Fail = true; + } Send(ev->Sender, answer.Release()); } @@ -1512,54 +1518,60 @@ void TQueueLeader::AskQueueAttributes() { void TQueueLeader::OnQueueAttributes(const TSqsEvents::TEvExecuted::TRecord& ev) { const ui32 status = ev.GetStatus(); + bool queueExists = true; if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { using NKikimr::NClient::TValue; const TValue val(TValue::Create(ev.GetExecutionEngineEvaluatedResponse())); - const TValue& attrs(val["attrs"]); - - TSqsEvents::TQueueAttributes attributes; - attributes.ContentBasedDeduplication = attrs["ContentBasedDeduplication"]; - attributes.DelaySeconds = TDuration::MilliSeconds(attrs["DelaySeconds"]); - attributes.FifoQueue = attrs["FifoQueue"]; - attributes.MaximumMessageSize = attrs["MaximumMessageSize"]; - attributes.MessageRetentionPeriod = TDuration::MilliSeconds(attrs["MessageRetentionPeriod"]); - attributes.ReceiveMessageWaitTime = TDuration::MilliSeconds(attrs["ReceiveMessageWaitTime"]); - attributes.VisibilityTimeout = TDuration::MilliSeconds(attrs["VisibilityTimeout"]); - - const TValue showDetailedCountersDeadline = attrs["ShowDetailedCountersDeadline"]; - if (showDetailedCountersDeadline.HaveValue()) { - const ui64 ms = showDetailedCountersDeadline; - Counters_->ShowDetailedCounters(TInstant::MilliSeconds(ms)); - } - - // update dead letter queue info - const auto& dlqNameVal(attrs["DlqName"]); - const auto& maxReceiveCountVal(attrs["MaxReceiveCount"]); - if (dlqNameVal.HaveValue() && maxReceiveCountVal.HaveValue()) { - TTargetDlqInfo info; - info.DlqName = TString(dlqNameVal); - info.MaxReceiveCount = ui64(maxReceiveCountVal); - if (info.DlqName && info.MaxReceiveCount) { - DlqInfo_ = info; - // now we have to discover queue id and version - Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvGetQueueId("DLQ", UserName_, info.DlqName, FolderId_)); - } else { - DlqInfo_.Clear(); + + queueExists = val["queueExists"]; + if (queueExists) { + const TValue& attrs(val["attrs"]); + + TSqsEvents::TQueueAttributes attributes; + attributes.ContentBasedDeduplication = attrs["ContentBasedDeduplication"]; + attributes.DelaySeconds = TDuration::MilliSeconds(attrs["DelaySeconds"]); + attributes.FifoQueue = attrs["FifoQueue"]; + attributes.MaximumMessageSize = attrs["MaximumMessageSize"]; + attributes.MessageRetentionPeriod = TDuration::MilliSeconds(attrs["MessageRetentionPeriod"]); + attributes.ReceiveMessageWaitTime = TDuration::MilliSeconds(attrs["ReceiveMessageWaitTime"]); + attributes.VisibilityTimeout = TDuration::MilliSeconds(attrs["VisibilityTimeout"]); + + const TValue showDetailedCountersDeadline = attrs["ShowDetailedCountersDeadline"]; + if (showDetailedCountersDeadline.HaveValue()) { + const ui64 ms = showDetailedCountersDeadline; + Counters_->ShowDetailedCounters(TInstant::MilliSeconds(ms)); } - } - QueueAttributes_ = attributes; - AttributesUpdateTime_ = TActivationContext::Now(); - for (auto& req : GetConfigurationRequests_) { - AnswerGetConfiguration(req); - } - GetConfigurationRequests_.clear(); - } else { - for (auto& req : GetConfigurationRequests_) { - AnswerFailed(req); + // update dead letter queue info + const auto& dlqNameVal(attrs["DlqName"]); + const auto& maxReceiveCountVal(attrs["MaxReceiveCount"]); + if (dlqNameVal.HaveValue() && maxReceiveCountVal.HaveValue()) { + TTargetDlqInfo info; + info.DlqName = TString(dlqNameVal); + info.MaxReceiveCount = ui64(maxReceiveCountVal); + if (info.DlqName && info.MaxReceiveCount) { + DlqInfo_ = info; + // now we have to discover queue id and version + Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvGetQueueId("DLQ", UserName_, info.DlqName, FolderId_)); + } else { + DlqInfo_.Clear(); + } + } + + QueueAttributes_ = attributes; + AttributesUpdateTime_ = TActivationContext::Now(); + for (auto& req : GetConfigurationRequests_) { + AnswerGetConfiguration(req); + } + GetConfigurationRequests_.clear(); + return; } - GetConfigurationRequests_.clear(); } + + for (auto& req : GetConfigurationRequests_) { + AnswerFailed(req, !queueExists); + } + GetConfigurationRequests_.clear(); } void TQueueLeader::HandleQueueId(TSqsEvents::TEvQueueId::TPtr& ev) { diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h index ef68e5c0d89..6178288ddbf 100644 --- a/ydb/core/ymq/actor/queue_leader.h +++ b/ydb/core/ymq/actor/queue_leader.h @@ -100,7 +100,7 @@ private: void RemoveCachedRequest(size_t shard, size_t idx); void CreateBackgroundActors(); void AnswerGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr& req); - void AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev); + void AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev, bool queueRemoved = false); void AskQueueAttributes(); void OnQueueAttributes(const TSqsEvents::TEvExecuted::TRecord& ev); void MarkInflyReloading(ui64 shard, i64 invalidatedCount, const TString& invalidationReason); diff --git a/ydb/core/ymq/queues/fifo/queries.cpp b/ydb/core/ymq/queues/fifo/queries.cpp index 424fcf5c856..60c080198ca 100644 --- a/ydb/core/ymq/queues/fifo/queries.cpp +++ b/ydb/core/ymq/queues/fifo/queries.cpp @@ -533,8 +533,11 @@ const char* const InternalGetQueueAttributesQuery = R"__( 'VisibilityTimeout 'ShowDetailedCountersDeadline)) + (let attrsRead (SelectRow attrsTable attrsRow attrsSelect)) + (return (AsList - (SetResult 'attrs (SelectRow attrsTable attrsRow attrsSelect)))) + (SetResult 'queueExists (Exists attrsRead)) + (SetResult 'attrs attrsRead))) ) )__"; diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp index 8b6d93f951e..9ef609c963e 100644 --- a/ydb/core/ymq/queues/std/queries.cpp +++ b/ydb/core/ymq/queues/std/queries.cpp @@ -556,8 +556,11 @@ const char* const InternalGetQueueAttributesQuery = R"__( 'VisibilityTimeout 'ShowDetailedCountersDeadline)) + (let attrsRead (SelectRow attrsTable attrsRow attrsSelect)) + (return (AsList - (SetResult 'attrs (SelectRow attrsTable attrsRow attrsSelect)))) + (SetResult 'queueExists (Exists attrsRead)) + (SetResult 'attrs attrsRead))) ) )__"; diff --git a/ydb/tests/functional/sqs/common/test_queues_managing.py b/ydb/tests/functional/sqs/common/test_queues_managing.py index af8510b459b..aa87459ba77 100644 --- a/ydb/tests/functional/sqs/common/test_queues_managing.py +++ b/ydb/tests/functional/sqs/common/test_queues_managing.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import time +import logging import pytest from hamcrest import assert_that, equal_to, greater_than, not_none, none, has_item, has_items, raises, empty, instance_of @@ -233,6 +234,28 @@ class QueuesManagingTest(KikimrSqsTestBase): @pytest.mark.parametrize(**IS_FIFO_PARAMS) @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) + def test_request_to_deleted_queue(self, is_fifo, tables_format): + self._init_with_params(is_fifo, tables_format) + + created_queue_url = self._create_queue_and_assert(self.queue_name, is_fifo=is_fifo) + self._sqs_api.get_queue_attributes(created_queue_url) + delete_result = self._sqs_api.delete_queue(created_queue_url) + assert_that( + delete_result, not_none() + ) + + while True: + try: + self._sqs_api.get_queue_attributes(created_queue_url) + except Exception as e: + logging.info(f'error during getting attributes after deletion: {e}') + if 'The specified queue doesn\'t exist.' in str(e): + break + else: + assert tables_format != 1 # for tables_format = 0 : can be internal error (failed to resolve table) + + @pytest.mark.parametrize(**IS_FIFO_PARAMS) + @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) def test_purge_queue(self, is_fifo, tables_format): self._init_with_params(is_fifo, tables_format) |