summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsergeyveselov <[email protected]>2023-10-26 12:55:00 +0300
committersergeyveselov <[email protected]>2023-10-26 13:36:20 +0300
commit15d6675c4a740f2c42d5c77c477b233d492b283b (patch)
tree186da7ff9a5065bdd29eae6478027ef40662b3c8
parentff9e99596eacad8108b1b5045c7e92de08453be0 (diff)
Move action actors counters to queue leader in SQS
Move action actors counting to queue leader
-rw-r--r--ydb/core/protos/msgbus.proto7
-rw-r--r--ydb/core/ymq/actor/action.h52
-rw-r--r--ydb/core/ymq/actor/events.h6
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp38
-rw-r--r--ydb/core/ymq/actor/queue_leader.h1
-rw-r--r--ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py29
-rw-r--r--ydb/tests/functional/sqs/common/test_queue_counters.py28
7 files changed, 142 insertions, 19 deletions
diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto
index 41755259e1c..4d9290fd307 100644
--- a/ydb/core/protos/msgbus.proto
+++ b/ydb/core/protos/msgbus.proto
@@ -671,6 +671,13 @@ message TSqsResponse {
optional bool IsFifo = 33;
}
+message TSqsActionCounterChanged {
+ optional uint32 Action = 1;
+ optional uint32 ErrorsCount = 2;
+ optional uint64 DurationMs = 3;
+ optional uint64 WorkingDurationMs = 4;
+}
+
message TInterconnectDebug {
optional string Name = 1;
optional uint32 Channel = 2;
diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h
index 0dfaa5f361d..3782ffe2b10 100644
--- a/ydb/core/ymq/actor/action.h
+++ b/ydb/core/ymq/actor/action.h
@@ -249,31 +249,45 @@ protected:
void SendReplyAndDie() {
RLOG_SQS_TRACE("SendReplyAndDie from action actor " << Response_);
- auto actionCountersCouple = GetActionCounters();
auto* detailedCounters = UserCounters_ ? UserCounters_->GetDetailedCounters() : nullptr;
const size_t errors = ErrorsCount(Response_, detailedCounters ? &detailedCounters->APIStatuses : nullptr);
- if (actionCountersCouple.SqsCounters) {
- if (errors) {
- ADD_COUNTER(actionCountersCouple.SqsCounters, Errors, errors);
- } else {
- INC_COUNTER(actionCountersCouple.SqsCounters, Success);
+
+ const TDuration duration = GetRequestDuration();
+ const TDuration workingDuration = GetRequestWorkingDuration();
+ if (QueueLeader_ && (IsActionForQueue(Action_) || IsActionForQueueYMQ(Action_))) {
+ auto counterChangedEvent = MakeHolder<TSqsEvents::TEvActionCounterChanged>();
+ counterChangedEvent->Record.set_action(Action_);
+ counterChangedEvent->Record.set_durationms(duration.MilliSeconds());
+ counterChangedEvent->Record.set_workingdurationms(workingDuration.MilliSeconds());
+ counterChangedEvent->Record.set_errorscount(errors);
+ this->Send(QueueLeader_, counterChangedEvent.Release());
+ } else if (UserCounters_ && UserCounters_->NeedToShowDetailedCounters()) {
+ TCountersCouple<TActionCounters*> userCounters{nullptr, nullptr};
+ if (IsActionForUser(Action_)) {
+ userCounters.SqsCounters = &UserCounters_->SqsActionCounters[Action_];
+ if (errors) {
+ ADD_COUNTER(userCounters.SqsCounters, Errors, errors);
+ } else {
+ INC_COUNTER(userCounters.SqsCounters, Success);
+ }
}
- }
- if (actionCountersCouple.YmqCounters) {
- if (errors) {
- ADD_COUNTER(actionCountersCouple.YmqCounters, Errors, errors);
- } else {
- INC_COUNTER(actionCountersCouple.YmqCounters, Success);
+ if (IsActionForUserYMQ(Action_)) {
+ userCounters.YmqCounters = &UserCounters_->YmqActionCounters[Action_];
+ if (errors) {
+ ADD_COUNTER(userCounters.YmqCounters, Errors, errors);
+ } else {
+ INC_COUNTER(userCounters.YmqCounters, Success);
+ }
+ }
+
+ if (userCounters.Defined()) {
+ COLLECT_HISTOGRAM_COUNTER_COUPLE(userCounters, Duration, duration.MilliSeconds());
+ RLOG_SQS_DEBUG("Request " << Action_ << " working duration: " << workingDuration.MilliSeconds() << "ms");
+ COLLECT_HISTOGRAM_COUNTER_COUPLE(userCounters, WorkingDuration, workingDuration.MilliSeconds());
}
}
+
FinishTs_ = TActivationContext::Now();
- const TDuration workingDuration = GetRequestWorkingDuration();
- RLOG_SQS_DEBUG("Request " << Action_ << " working duration: " << workingDuration.MilliSeconds() << "ms");
- if (actionCountersCouple.Defined()) {
- const TDuration duration = GetRequestDuration();
- COLLECT_HISTOGRAM_COUNTER_COUPLE(actionCountersCouple, Duration, duration.MilliSeconds());
- COLLECT_HISTOGRAM_COUNTER_COUPLE(actionCountersCouple, WorkingDuration, workingDuration.MilliSeconds());
- }
if (IsRequestSlow()) {
PrintSlowRequestWarning();
}
diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h
index 4bf7e0b8c16..f8a6d07023d 100644
--- a/ydb/core/ymq/actor/events.h
+++ b/ydb/core/ymq/actor/events.h
@@ -136,6 +136,8 @@ struct TSqsEvents {
EvReloadStateResponse,
EvLeaderStarted,
+ EvActionCounterChanged,
+
EvEnd,
};
@@ -397,6 +399,10 @@ struct TSqsEvents {
}
};
+ struct TEvActionCounterChanged: public NActors::TEventPB<TEvActionCounterChanged, NKikimrClient::TSqsActionCounterChanged, EvActionCounterChanged> {
+ using TEventPB::TEventPB;
+ };
+
// Request that is sent from proxy to sqs service actor on other (leader) node
struct TEvSqsRequest : public NActors::TEventPB<TEvSqsRequest, NKikimrClient::TSqsRequest, EvSqsRequest> {
using TEventPB::TEventPB;
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index 1631301b091..e893157ecbe 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -126,6 +126,7 @@ STATEFN(TQueueLeader::StateInit) {
// interface
cFunc(TEvPoisonPill::EventType, PassAway); // from service
hFunc(TSqsEvents::TEvGetConfiguration, HandleGetConfigurationWhileIniting); // from action actors
+ hFunc(TSqsEvents::TEvActionCounterChanged, HandleActionCounterChanged);
hFunc(TSqsEvents::TEvExecute, HandleExecuteWhileIniting); // from action actors
hFunc(TSqsEvents::TEvClearQueueAttributesCache, HandleClearQueueAttributesCache); // from set queue attributes
hFunc(TSqsEvents::TEvPurgeQueue, HandlePurgeQueue); // from purge queue actor
@@ -152,6 +153,7 @@ STATEFN(TQueueLeader::StateWorking) {
// interface
cFunc(TEvPoisonPill::EventType, PassAway); // from service
hFunc(TSqsEvents::TEvGetConfiguration, HandleGetConfigurationWhileWorking); // from action actors
+ hFunc(TSqsEvents::TEvActionCounterChanged, HandleActionCounterChanged);
hFunc(TSqsEvents::TEvExecute, HandleExecuteWhileWorking); // from action actors
hFunc(TSqsEvents::TEvClearQueueAttributesCache, HandleClearQueueAttributesCache); // from set queue attributes
hFunc(TSqsEvents::TEvPurgeQueue, HandlePurgeQueue); // from purge queue actor
@@ -346,6 +348,42 @@ void TQueueLeader::HandleGetConfigurationWhileWorking(TSqsEvents::TEvGetConfigur
}
}
+void TQueueLeader::HandleActionCounterChanged(TSqsEvents::TEvActionCounterChanged::TPtr& ev) {
+ auto actionNumber = ev->Get()->Record.GetAction();
+ if (actionNumber > EAction::ActionsArraySize) {
+ LOG_SQS_DEBUG("Action with number " << actionNumber << " doesn't exist.");
+ return;
+ }
+ EAction action = static_cast<EAction>(actionNumber);
+ if (!IsActionForMessage(action) && !Counters_->NeedToShowDetailedCounters()) {
+ return;
+ }
+ ui32 errorsCount = ev->Get()->Record.GetErrorsCount();
+ TCountersCouple<TActionCounters*> actionCountersCouple{nullptr, nullptr};
+ if (IsActionForQueue(action)) {
+ actionCountersCouple.SqsCounters = &Counters_->SqsActionCounters[action];
+ if (errorsCount > 0) {
+ ADD_COUNTER(actionCountersCouple.SqsCounters, Errors, errorsCount);
+ } else {
+ INC_COUNTER(actionCountersCouple.SqsCounters, Success);
+ }
+ }
+ if (IsActionForQueueYMQ(action)) {
+ actionCountersCouple.YmqCounters = &Counters_->YmqActionCounters[action];
+ if (errorsCount > 0) {
+ ADD_COUNTER(actionCountersCouple.YmqCounters, Errors, errorsCount);
+ } else {
+ INC_COUNTER(actionCountersCouple.YmqCounters, Success);
+ }
+ }
+ if (actionCountersCouple.Defined()) {
+ COLLECT_HISTOGRAM_COUNTER_COUPLE(actionCountersCouple, Duration, ev->Get()->Record.GetDurationMs());
+ auto workingDuration = ev->Get()->Record.GetWorkingDurationMs();
+ LOG_SQS_DEBUG("Request " << action << " working duration: " << workingDuration << "ms");
+ COLLECT_HISTOGRAM_COUNTER_COUPLE(actionCountersCouple, WorkingDuration, workingDuration);
+ }
+}
+
void TQueueLeader::HandleClearQueueAttributesCache([[maybe_unused]] TSqsEvents::TEvClearQueueAttributesCache::TPtr& ev) {
AttributesUpdateTime_ = TInstant::Zero();
QueueAttributes_ = Nothing();
diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h
index 2bda1d763ae..1357179c615 100644
--- a/ydb/core/ymq/actor/queue_leader.h
+++ b/ydb/core/ymq/actor/queue_leader.h
@@ -60,6 +60,7 @@ private:
void HandleState(const TSqsEvents::TEvExecuted::TRecord& ev);
void HandleGetConfigurationWhileIniting(TSqsEvents::TEvGetConfiguration::TPtr& ev);
void HandleGetConfigurationWhileWorking(TSqsEvents::TEvGetConfiguration::TPtr& ev);
+ void HandleActionCounterChanged(TSqsEvents::TEvActionCounterChanged::TPtr& ev);
void HandleExecuteWhileIniting(TSqsEvents::TEvExecute::TPtr& ev);
void HandleExecuteWhileWorking(TSqsEvents::TEvExecute::TPtr& ev);
void HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev);
diff --git a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py
index 44431dfd06e..244db79d05d 100644
--- a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py
+++ b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_queue_counters.py
@@ -140,3 +140,32 @@ class TestYmqQueueCounters(get_test_with_sqs_tenant_installation(KikimrSqsTestBa
'name': 'queue.messages.empty_receive_attempts_count_per_second',
})
assert receive_message_empty_count == 1
+
+ def test_sqs_action_counters(self):
+
+ self._sqs_api = self._create_api_for_user(self._username, raise_on_error=True, force_private=True, iam_token=self.iam_token, folder_id=self.folder_id)
+
+ queue_url = self._sqs_api.create_queue(self.queue_name)
+ queue_resource_id = self._get_queue_resource_id(queue_url, self.queue_name)
+
+ message_payload = "foobar"
+
+ self._sqs_api.send_message(queue_url, message_payload)
+ self._read_while_not_empty(queue_url, 1)
+
+ ymq_counters = self._get_ymq_counters(cloud=self.cloud_id, folder=self.folder_id)
+
+ successes = self._get_counter_value(ymq_counters, {
+ 'queue': queue_resource_id,
+ 'method': 'receive_message',
+ 'name': 'api.http.requests_count_per_second',
+ })
+ assert successes == 1
+
+ durations = self._get_counter(ymq_counters, {
+ 'queue': queue_resource_id,
+ 'method': 'receive_message',
+ 'name': 'api.http.request_duration_milliseconds',
+ })
+ duration_buckets = durations['hist']['buckets']
+ assert any(map(lambda x: x > 0, duration_buckets))
diff --git a/ydb/tests/functional/sqs/common/test_queue_counters.py b/ydb/tests/functional/sqs/common/test_queue_counters.py
index 8bf6fb89e9e..bb00929d101 100644
--- a/ydb/tests/functional/sqs/common/test_queue_counters.py
+++ b/ydb/tests/functional/sqs/common/test_queue_counters.py
@@ -81,3 +81,31 @@ class TestSqsGettingCounters(KikimrSqsTestBase):
'sensor': 'ReceiveMessage_EmptyCount',
})
assert receive_message_empty_count == 1
+
+ def test_sqs_action_counters(self):
+ queue_url = self._create_queue_and_assert(self.queue_name, False, True)
+ message_payload = "foobar"
+ self._sqs_api.send_message(queue_url, message_payload)
+ self._read_while_not_empty(queue_url, 1)
+
+ sqs_counters = self._get_sqs_counters()
+
+ successes = self._get_counter_value(sqs_counters, {
+ 'queue': self.queue_name,
+ 'sensor': 'ReceiveMessage_Success',
+ })
+ assert successes == 1
+
+ durations = self._get_counter(sqs_counters, {
+ 'queue': self.queue_name,
+ 'sensor': 'ReceiveMessage_Duration',
+ })
+ duration_buckets = durations['hist']['buckets']
+ assert any(map(lambda x: x > 0, duration_buckets))
+
+ working_durations = self._get_counter(sqs_counters, {
+ 'queue': self.queue_name,
+ 'sensor': 'ReceiveMessage_WorkingDuration',
+ })
+ working_duration_buckets = working_durations['hist']['buckets']
+ assert any(map(lambda x: x > 0, working_duration_buckets))