diff options
author | sergeyveselov <[email protected]> | 2023-10-26 12:55:00 +0300 |
---|---|---|
committer | sergeyveselov <[email protected]> | 2023-10-26 13:36:20 +0300 |
commit | 15d6675c4a740f2c42d5c77c477b233d492b283b (patch) | |
tree | 186da7ff9a5065bdd29eae6478027ef40662b3c8 | |
parent | ff9e99596eacad8108b1b5045c7e92de08453be0 (diff) |
Move action actors counters to queue leader in SQS
Move action actors counting to queue leader
-rw-r--r-- | ydb/core/protos/msgbus.proto | 7 | ||||
-rw-r--r-- | ydb/core/ymq/actor/action.h | 52 | ||||
-rw-r--r-- | ydb/core/ymq/actor/events.h | 6 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 38 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.h | 1 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py | 29 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/common/test_queue_counters.py | 28 |
7 files changed, 142 insertions, 19 deletions
diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto index 41755259e1c..4d9290fd307 100644 --- a/ydb/core/protos/msgbus.proto +++ b/ydb/core/protos/msgbus.proto @@ -671,6 +671,13 @@ message TSqsResponse { optional bool IsFifo = 33; } +message TSqsActionCounterChanged { + optional uint32 Action = 1; + optional uint32 ErrorsCount = 2; + optional uint64 DurationMs = 3; + optional uint64 WorkingDurationMs = 4; +} + message TInterconnectDebug { optional string Name = 1; optional uint32 Channel = 2; diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h index 0dfaa5f361d..3782ffe2b10 100644 --- a/ydb/core/ymq/actor/action.h +++ b/ydb/core/ymq/actor/action.h @@ -249,31 +249,45 @@ protected: void SendReplyAndDie() { RLOG_SQS_TRACE("SendReplyAndDie from action actor " << Response_); - auto actionCountersCouple = GetActionCounters(); auto* detailedCounters = UserCounters_ ? UserCounters_->GetDetailedCounters() : nullptr; const size_t errors = ErrorsCount(Response_, detailedCounters ? &detailedCounters->APIStatuses : nullptr); - if (actionCountersCouple.SqsCounters) { - if (errors) { - ADD_COUNTER(actionCountersCouple.SqsCounters, Errors, errors); - } else { - INC_COUNTER(actionCountersCouple.SqsCounters, Success); + + const TDuration duration = GetRequestDuration(); + const TDuration workingDuration = GetRequestWorkingDuration(); + if (QueueLeader_ && (IsActionForQueue(Action_) || IsActionForQueueYMQ(Action_))) { + auto counterChangedEvent = MakeHolder<TSqsEvents::TEvActionCounterChanged>(); + counterChangedEvent->Record.set_action(Action_); + counterChangedEvent->Record.set_durationms(duration.MilliSeconds()); + counterChangedEvent->Record.set_workingdurationms(workingDuration.MilliSeconds()); + counterChangedEvent->Record.set_errorscount(errors); + this->Send(QueueLeader_, counterChangedEvent.Release()); + } else if (UserCounters_ && UserCounters_->NeedToShowDetailedCounters()) { + TCountersCouple<TActionCounters*> userCounters{nullptr, nullptr}; + if (IsActionForUser(Action_)) { + userCounters.SqsCounters = &UserCounters_->SqsActionCounters[Action_]; + if (errors) { + ADD_COUNTER(userCounters.SqsCounters, Errors, errors); + } else { + INC_COUNTER(userCounters.SqsCounters, Success); + } } - } - if (actionCountersCouple.YmqCounters) { - if (errors) { - ADD_COUNTER(actionCountersCouple.YmqCounters, Errors, errors); - } else { - INC_COUNTER(actionCountersCouple.YmqCounters, Success); + if (IsActionForUserYMQ(Action_)) { + userCounters.YmqCounters = &UserCounters_->YmqActionCounters[Action_]; + if (errors) { + ADD_COUNTER(userCounters.YmqCounters, Errors, errors); + } else { + INC_COUNTER(userCounters.YmqCounters, Success); + } + } + + if (userCounters.Defined()) { + COLLECT_HISTOGRAM_COUNTER_COUPLE(userCounters, Duration, duration.MilliSeconds()); + RLOG_SQS_DEBUG("Request " << Action_ << " working duration: " << workingDuration.MilliSeconds() << "ms"); + COLLECT_HISTOGRAM_COUNTER_COUPLE(userCounters, WorkingDuration, workingDuration.MilliSeconds()); } } + FinishTs_ = TActivationContext::Now(); - const TDuration workingDuration = GetRequestWorkingDuration(); - RLOG_SQS_DEBUG("Request " << Action_ << " working duration: " << workingDuration.MilliSeconds() << "ms"); - if (actionCountersCouple.Defined()) { - const TDuration duration = GetRequestDuration(); - COLLECT_HISTOGRAM_COUNTER_COUPLE(actionCountersCouple, Duration, duration.MilliSeconds()); - COLLECT_HISTOGRAM_COUNTER_COUPLE(actionCountersCouple, WorkingDuration, workingDuration.MilliSeconds()); - } if (IsRequestSlow()) { PrintSlowRequestWarning(); } diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h index 4bf7e0b8c16..f8a6d07023d 100644 --- a/ydb/core/ymq/actor/events.h +++ b/ydb/core/ymq/actor/events.h @@ -136,6 +136,8 @@ struct TSqsEvents { EvReloadStateResponse, EvLeaderStarted, + EvActionCounterChanged, + EvEnd, }; @@ -397,6 +399,10 @@ struct TSqsEvents { } }; + struct TEvActionCounterChanged: public NActors::TEventPB<TEvActionCounterChanged, NKikimrClient::TSqsActionCounterChanged, EvActionCounterChanged> { + using TEventPB::TEventPB; + }; + // Request that is sent from proxy to sqs service actor on other (leader) node struct TEvSqsRequest : public NActors::TEventPB<TEvSqsRequest, NKikimrClient::TSqsRequest, EvSqsRequest> { using TEventPB::TEventPB; diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index 1631301b091..e893157ecbe 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -126,6 +126,7 @@ STATEFN(TQueueLeader::StateInit) { // interface cFunc(TEvPoisonPill::EventType, PassAway); // from service hFunc(TSqsEvents::TEvGetConfiguration, HandleGetConfigurationWhileIniting); // from action actors + hFunc(TSqsEvents::TEvActionCounterChanged, HandleActionCounterChanged); hFunc(TSqsEvents::TEvExecute, HandleExecuteWhileIniting); // from action actors hFunc(TSqsEvents::TEvClearQueueAttributesCache, HandleClearQueueAttributesCache); // from set queue attributes hFunc(TSqsEvents::TEvPurgeQueue, HandlePurgeQueue); // from purge queue actor @@ -152,6 +153,7 @@ STATEFN(TQueueLeader::StateWorking) { // interface cFunc(TEvPoisonPill::EventType, PassAway); // from service hFunc(TSqsEvents::TEvGetConfiguration, HandleGetConfigurationWhileWorking); // from action actors + hFunc(TSqsEvents::TEvActionCounterChanged, HandleActionCounterChanged); hFunc(TSqsEvents::TEvExecute, HandleExecuteWhileWorking); // from action actors hFunc(TSqsEvents::TEvClearQueueAttributesCache, HandleClearQueueAttributesCache); // from set queue attributes hFunc(TSqsEvents::TEvPurgeQueue, HandlePurgeQueue); // from purge queue actor @@ -346,6 +348,42 @@ void TQueueLeader::HandleGetConfigurationWhileWorking(TSqsEvents::TEvGetConfigur } } +void TQueueLeader::HandleActionCounterChanged(TSqsEvents::TEvActionCounterChanged::TPtr& ev) { + auto actionNumber = ev->Get()->Record.GetAction(); + if (actionNumber > EAction::ActionsArraySize) { + LOG_SQS_DEBUG("Action with number " << actionNumber << " doesn't exist."); + return; + } + EAction action = static_cast<EAction>(actionNumber); + if (!IsActionForMessage(action) && !Counters_->NeedToShowDetailedCounters()) { + return; + } + ui32 errorsCount = ev->Get()->Record.GetErrorsCount(); + TCountersCouple<TActionCounters*> actionCountersCouple{nullptr, nullptr}; + if (IsActionForQueue(action)) { + actionCountersCouple.SqsCounters = &Counters_->SqsActionCounters[action]; + if (errorsCount > 0) { + ADD_COUNTER(actionCountersCouple.SqsCounters, Errors, errorsCount); + } else { + INC_COUNTER(actionCountersCouple.SqsCounters, Success); + } + } + if (IsActionForQueueYMQ(action)) { + actionCountersCouple.YmqCounters = &Counters_->YmqActionCounters[action]; + if (errorsCount > 0) { + ADD_COUNTER(actionCountersCouple.YmqCounters, Errors, errorsCount); + } else { + INC_COUNTER(actionCountersCouple.YmqCounters, Success); + } + } + if (actionCountersCouple.Defined()) { + COLLECT_HISTOGRAM_COUNTER_COUPLE(actionCountersCouple, Duration, ev->Get()->Record.GetDurationMs()); + auto workingDuration = ev->Get()->Record.GetWorkingDurationMs(); + LOG_SQS_DEBUG("Request " << action << " working duration: " << workingDuration << "ms"); + COLLECT_HISTOGRAM_COUNTER_COUPLE(actionCountersCouple, WorkingDuration, workingDuration); + } +} + void TQueueLeader::HandleClearQueueAttributesCache([[maybe_unused]] TSqsEvents::TEvClearQueueAttributesCache::TPtr& ev) { AttributesUpdateTime_ = TInstant::Zero(); QueueAttributes_ = Nothing(); diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h index 2bda1d763ae..1357179c615 100644 --- a/ydb/core/ymq/actor/queue_leader.h +++ b/ydb/core/ymq/actor/queue_leader.h @@ -60,6 +60,7 @@ private: void HandleState(const TSqsEvents::TEvExecuted::TRecord& ev); void HandleGetConfigurationWhileIniting(TSqsEvents::TEvGetConfiguration::TPtr& ev); void HandleGetConfigurationWhileWorking(TSqsEvents::TEvGetConfiguration::TPtr& ev); + void HandleActionCounterChanged(TSqsEvents::TEvActionCounterChanged::TPtr& ev); void HandleExecuteWhileIniting(TSqsEvents::TEvExecute::TPtr& ev); void HandleExecuteWhileWorking(TSqsEvents::TEvExecute::TPtr& ev); void HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev); diff --git a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py index 44431dfd06e..244db79d05d 100644 --- a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py +++ b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py @@ -140,3 +140,32 @@ class TestYmqQueueCounters(get_test_with_sqs_tenant_installation(KikimrSqsTestBa 'name': 'queue.messages.empty_receive_attempts_count_per_second', }) assert receive_message_empty_count == 1 + + def test_sqs_action_counters(self): + + self._sqs_api = self._create_api_for_user(self._username, raise_on_error=True, force_private=True, iam_token=self.iam_token, folder_id=self.folder_id) + + queue_url = self._sqs_api.create_queue(self.queue_name) + queue_resource_id = self._get_queue_resource_id(queue_url, self.queue_name) + + message_payload = "foobar" + + self._sqs_api.send_message(queue_url, message_payload) + self._read_while_not_empty(queue_url, 1) + + ymq_counters = self._get_ymq_counters(cloud=self.cloud_id, folder=self.folder_id) + + successes = self._get_counter_value(ymq_counters, { + 'queue': queue_resource_id, + 'method': 'receive_message', + 'name': 'api.http.requests_count_per_second', + }) + assert successes == 1 + + durations = self._get_counter(ymq_counters, { + 'queue': queue_resource_id, + 'method': 'receive_message', + 'name': 'api.http.request_duration_milliseconds', + }) + duration_buckets = durations['hist']['buckets'] + assert any(map(lambda x: x > 0, duration_buckets)) diff --git a/ydb/tests/functional/sqs/common/test_queue_counters.py b/ydb/tests/functional/sqs/common/test_queue_counters.py index 8bf6fb89e9e..bb00929d101 100644 --- a/ydb/tests/functional/sqs/common/test_queue_counters.py +++ b/ydb/tests/functional/sqs/common/test_queue_counters.py @@ -81,3 +81,31 @@ class TestSqsGettingCounters(KikimrSqsTestBase): 'sensor': 'ReceiveMessage_EmptyCount', }) assert receive_message_empty_count == 1 + + def test_sqs_action_counters(self): + queue_url = self._create_queue_and_assert(self.queue_name, False, True) + message_payload = "foobar" + self._sqs_api.send_message(queue_url, message_payload) + self._read_while_not_empty(queue_url, 1) + + sqs_counters = self._get_sqs_counters() + + successes = self._get_counter_value(sqs_counters, { + 'queue': self.queue_name, + 'sensor': 'ReceiveMessage_Success', + }) + assert successes == 1 + + durations = self._get_counter(sqs_counters, { + 'queue': self.queue_name, + 'sensor': 'ReceiveMessage_Duration', + }) + duration_buckets = durations['hist']['buckets'] + assert any(map(lambda x: x > 0, duration_buckets)) + + working_durations = self._get_counter(sqs_counters, { + 'queue': self.queue_name, + 'sensor': 'ReceiveMessage_WorkingDuration', + }) + working_duration_buckets = working_durations['hist']['buckets'] + assert any(map(lambda x: x > 0, working_duration_buckets)) |