aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2023-06-01 11:15:27 +0300
committeralexbogo <alexbogo@ydb.tech>2023-06-01 11:15:27 +0300
commitad69a1b560522eea94ab9632a6b7176f8e8731df (patch)
treec7515e23b9ecc8a36e69e8598cbd8c4960bae989
parent859bb5ff4cd6798053dbbf6e8440a4b124d6678d (diff)
downloadydb-ad69a1b560522eea94ab9632a6b7176f8e8731df.tar.gz
[ymq] reload DLQ state on request
init
-rw-r--r--ydb/core/ymq/actor/events.h21
-rw-r--r--ydb/core/ymq/actor/proxy_service.cpp136
-rw-r--r--ydb/core/ymq/actor/proxy_service.h30
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp63
-rw-r--r--ydb/core/ymq/actor/queue_leader.h7
-rw-r--r--ydb/core/ymq/actor/service.cpp16
-rw-r--r--ydb/core/ymq/actor/service.h1
-rw-r--r--ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt13
-rw-r--r--ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt13
-rw-r--r--ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt13
-rw-r--r--ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt13
-rw-r--r--ydb/core/ymq/proto/events.proto15
12 files changed, 319 insertions, 22 deletions
diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h
index 0132e1e0d33..f500e443916 100644
--- a/ydb/core/ymq/actor/events.h
+++ b/ydb/core/ymq/actor/events.h
@@ -14,6 +14,7 @@
#include <ydb/core/ymq/base/processed_request_attributes.h>
#include <ydb/core/ymq/base/query_id.h>
#include <ydb/core/ymq/base/queue_path.h>
+#include <ydb/core/ymq/proto/events.pb.h>
#include <ydb/core/ymq/proto/records.pb.h>
#include <library/cpp/actors/core/event_pb.h>
@@ -131,6 +132,8 @@ struct TSqsEvents {
EvNodeTrackerSubscriptionStatus,
EvForceReloadState,
+ EvReloadStateRequest,
+ EvReloadStateResponse,
EvEnd,
};
@@ -945,6 +948,24 @@ struct TSqsEvents {
{}
TDuration NextTryAfter;
};
+
+ struct TEvReloadStateRequest : public NActors::TEventPB<TEvReloadStateRequest, TReloadStateRequest, EvReloadStateRequest> {
+ TEvReloadStateRequest() = default;
+
+ TEvReloadStateRequest(const TString& user, const TString& queue) {
+ Record.MutableTarget()->SetUserName(user);
+ Record.MutableTarget()->SetQueueName(queue);
+ }
+ };
+
+ struct TEvReloadStateResponse : public NActors::TEventPB<TEvReloadStateResponse, TReloadStateResponse, EvReloadStateResponse> {
+ TEvReloadStateResponse() = default;
+ TEvReloadStateResponse(const TString& user, const TString& queue, TInstant reloadedAt) {
+ Record.MutableWho()->SetUserName(user);
+ Record.MutableWho()->SetQueueName(queue);
+ Record.SetReloadedAtMs(reloadedAt.MilliSeconds());
+ }
+ };
};
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/proxy_service.cpp b/ydb/core/ymq/actor/proxy_service.cpp
index 4990ac55823..ea3e096e042 100644
--- a/ydb/core/ymq/actor/proxy_service.cpp
+++ b/ydb/core/ymq/actor/proxy_service.cpp
@@ -24,6 +24,8 @@ using NKikimr::NClient::TValue;
namespace NKikimr::NSQS {
+constexpr TDuration RESEND_RELOAD_REQUEST_PERIOD = TDuration::Seconds(5);
+
struct TSqsProxyService::TNodeInfo : public TAtomicRefCount<TNodeInfo> {
explicit TNodeInfo(ui32 nodeId)
: NodeId(nodeId)
@@ -53,6 +55,9 @@ void TSqsProxyService::Bootstrap() {
SqsCounters_ = GetSqsServiceCounters(AppData()->Counters, "core");
YmqPublicCounters_ = GetYmqPublicCounters(AppData()->Counters);
+
+ ReloadStateRequestId_ = CreateGuidAsString();
+ Send(SelfId(), new TEvents::TEvWakeup());
}
void TSqsProxyService::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev) {
@@ -69,8 +74,7 @@ void TSqsProxyService::HandleSqsRequest(TSqsEvents::TEvSqsRequest::TPtr& ev) {
void TSqsProxyService::HandleProxySqsRequest(TSqsEvents::TEvProxySqsRequest::TPtr& ev) {
TProxyRequestInfoRef request = new TProxyRequestInfo(std::move(ev));
RequestsToProxy_.emplace(request->RequestId, request);
- Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvGetLeaderNodeForQueueRequest(request->RequestId, request->ProxyRequest->Get()->UserName, request->ProxyRequest->Get()->QueueName));
- RLOG_SQS_REQ_DEBUG(request->RequestId, "Send get leader node request to sqs service");
+ RequestLeaderNode(request->RequestId, request->ProxyRequest->Get()->UserName, request->ProxyRequest->Get()->QueueName);
}
static TSqsEvents::TEvProxySqsResponse::EProxyStatus GetLeaderNodeForQueueStatusToProxyStatus(TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus status) {
@@ -89,8 +93,85 @@ static TSqsEvents::TEvProxySqsResponse::EProxyStatus GetLeaderNodeForQueueStatus
}
}
+TSqsProxyService::TReloadStateRequestsInfoPtr TSqsProxyService::GetReloadStateRequestsInfo(const TString& user, const TString& queue) {
+ auto userIt = ReloadStateRequestsInfo_.find(user);
+ if (userIt != ReloadStateRequestsInfo_.end()) {
+ auto queueIt = userIt->second.find(queue);
+ if (queueIt != userIt->second.end()) {
+ return queueIt->second;
+ }
+ }
+ return nullptr;
+}
+
+void TSqsProxyService::RemoveReloadStateRequestsInfo(const TString& user, const TString& queue) {
+ auto userIt = ReloadStateRequestsInfo_.find(user);
+ if (userIt != ReloadStateRequestsInfo_.end()) {
+ userIt->second.erase(queue);
+ if (userIt->second.empty()) {
+ ReloadStateRequestsInfo_.erase(userIt);
+ }
+ }
+}
+
+void TSqsProxyService::ScheduleReloadStateRequest(TReloadStateRequestsInfoPtr info) {
+ info->RequestSendedAt = TInstant::Zero();
+ info->LeaderNodeRequested = false;
+ info->PlannedToSend = true;
+ ReloadStatePlanningToSend_.push_back(std::make_pair(TActivationContext::Now() + RESEND_RELOAD_REQUEST_PERIOD, info));
+
+}
+
+void TSqsProxyService::SendReloadStateIfNeeded(TSqsEvents::TEvGetLeaderNodeForQueueResponse::TPtr& ev) {
+ auto info = GetReloadStateRequestsInfo(ev->Get()->UserName, ev->Get()->QueueName);
+ if (!info) {
+ return;
+ }
+
+ if (info->RequestSendedAt == TInstant::Zero() && !info->PlannedToSend) {
+ if (ev->Get()->Status == TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::OK) {
+ Send(MakeSqsServiceID(ev->Get()->NodeId), new TSqsEvents::TEvReloadStateRequest(info->User, info->Queue));
+
+ info->RequestSendedAt = TActivationContext::Now();
+ ReloadStateRequestSended_[info->RequestSendedAt].insert(info);
+ } else if (
+ ev->Get()->Status == TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::NoUser
+ || ev->Get()->Status == TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::NoQueue
+ ) {
+ RemoveReloadStateRequestsInfo(ev->Get()->UserName, ev->Get()->QueueName);
+ } else {
+ ScheduleReloadStateRequest(info);
+ }
+ }
+}
+
+void TSqsProxyService::HandleWakeup(TEvents::TEvWakeup::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
+ auto now = TActivationContext::Now();
+ while (!ReloadStatePlanningToSend_.empty() && ReloadStatePlanningToSend_.front().first <= now) {
+ auto info = ReloadStatePlanningToSend_.front().second;
+ ReloadStatePlanningToSend_.pop_front();
+
+ RequestLeaderNode(info);
+ }
+ auto timeoutBorder = now - RESEND_RELOAD_REQUEST_PERIOD;
+ while (!ReloadStateRequestSended_.empty() && ReloadStateRequestSended_.begin()->first <= timeoutBorder) {
+ for (auto info : ReloadStateRequestSended_.begin()->second) {
+ RequestLeaderNode(info);
+ }
+ ReloadStateRequestSended_.erase(ReloadStateRequestSended_.begin());
+ }
+
+ Schedule(TDuration::MilliSeconds(100), new TEvWakeup());
+}
+
void TSqsProxyService::HandleGetLeaderNodeForQueueResponse(TSqsEvents::TEvGetLeaderNodeForQueueResponse::TPtr& ev) {
RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Got leader node for queue response. Node id: " << ev->Get()->NodeId << ". Status: " << static_cast<int>(ev->Get()->Status));
+
+ if (ev->Get()->RequestId == ReloadStateRequestId_) {
+ SendReloadStateIfNeeded(ev);
+ return;
+ }
+
const auto requestIt = RequestsToProxy_.find(ev->Get()->RequestId);
if (requestIt == RequestsToProxy_.end()) {
RLOG_SQS_REQ_ERROR(ev->Get()->RequestId, "Request was not found in requests to proxy map");
@@ -150,6 +231,54 @@ void TSqsProxyService::HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
HandleDisconnect(nodeId);
}
+void TSqsProxyService::RequestLeaderNode(TReloadStateRequestsInfoPtr info) {
+ info->PlannedToSend = false;
+ info->LeaderNodeRequested = true;
+ info->RequestSendedAt = TInstant::Zero();
+
+ RequestLeaderNode(ReloadStateRequestId_, info->User, info->Queue);
+}
+
+void TSqsProxyService::RequestLeaderNode(const TString& reqId, const TString& user, const TString& queue) {
+ Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvGetLeaderNodeForQueueRequest(reqId, user, queue));
+ RLOG_SQS_REQ_DEBUG(reqId, "Send get leader node request to sqs service for " << user << "/" << queue);
+}
+
+void TSqsProxyService::HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev) {
+ const auto& target = ev->Get()->Record.GetTarget();
+
+ auto info = GetReloadStateRequestsInfo(target.GetUserName(), target.GetQueueName());
+ if (!info) {
+ info = new TReloadStateRequestsInfo();
+ info->User = target.GetUserName();
+ info->Queue = target.GetQueueName();
+ ReloadStateRequestsInfo_[target.GetUserName()][target.GetQueueName()] = info;
+ }
+
+ info->ReloadStateBorder = TActivationContext::Now();
+ if (info->RequestSendedAt == TInstant::Zero() && !info->LeaderNodeRequested) {
+ RequestLeaderNode(ReloadStateRequestId_, target.GetUserName(), target.GetQueueName());
+ }
+}
+
+void TSqsProxyService::HandleReloadStateResponse(TSqsEvents::TEvReloadStateResponse::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+ const auto& who = record.GetWho();
+ auto info = GetReloadStateRequestsInfo(who.GetUserName(), who.GetQueueName());
+
+ if (!info) {
+ return;
+ }
+ ReloadStateRequestSended_[info->RequestSendedAt].erase(info);
+ TInstant reloadedAt = TInstant::MilliSeconds(record.GetReloadedAtMs());
+ if (reloadedAt < info->ReloadStateBorder) {
+ ScheduleReloadStateRequest(info);
+ } else {
+ RemoveReloadStateRequestsInfo(who.GetUserName(), who.GetQueueName());
+ }
+}
+
+
STATEFN(TSqsProxyService::StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TSqsEvents::TEvExecuted, HandleExecuted);
@@ -160,6 +289,9 @@ STATEFN(TSqsProxyService::StateFunc) {
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnect);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
hFunc(TSqsEvents::TEvGetLeaderNodeForQueueResponse, HandleGetLeaderNodeForQueueResponse);
+ hFunc(TSqsEvents::TEvReloadStateRequest, HandleReloadStateRequest);
+ hFunc(TSqsEvents::TEvReloadStateResponse, HandleReloadStateResponse);
+ HFunc(TEvents::TEvWakeup, HandleWakeup);
default:
LOG_SQS_ERROR("Unknown type of event came to SQS service actor: " << ev->Type << " (" << ev->GetTypeName() << "), sender: " << ev->Sender);
}
diff --git a/ydb/core/ymq/actor/proxy_service.h b/ydb/core/ymq/actor/proxy_service.h
index e3fc2c82903..4bbb72d497e 100644
--- a/ydb/core/ymq/actor/proxy_service.h
+++ b/ydb/core/ymq/actor/proxy_service.h
@@ -47,6 +47,19 @@ struct TReplierToSenderActorCallback : public IReplyCallback {
class TSqsProxyService
: public TActorBootstrapped<TSqsProxyService>
{
+private:
+ struct TReloadStateRequestsInfo : public TAtomicRefCount<TReloadStateRequestsInfo> {
+ TString User;
+ TString Queue;
+
+ TInstant RequestSendedAt;
+ TInstant ReloadStateBorder;
+ bool PlannedToSend = false;
+ bool LeaderNodeRequested = false;
+ };
+
+ using TReloadStateRequestsInfoPtr = TIntrusivePtr<TReloadStateRequestsInfo>;
+
public:
struct TNodeInfo;
using TNodeInfoRef = TIntrusivePtr<TNodeInfo>;
@@ -69,6 +82,15 @@ private:
void SendProxyError(TProxyRequestInfoRef request, TSqsEvents::TEvProxySqsResponse::EProxyStatus proxyStatus);
void SendProxyErrors(TNodeInfo& nodeInfo, TSqsEvents::TEvProxySqsResponse::EProxyStatus proxyStatus);
+
+ TReloadStateRequestsInfoPtr GetReloadStateRequestsInfo(const TString& user, const TString& queue);
+ void RemoveReloadStateRequestsInfo(const TString& user, const TString& queue);
+ void ScheduleReloadStateRequest(TReloadStateRequestsInfoPtr info);
+ void SendReloadStateIfNeeded(TSqsEvents::TEvGetLeaderNodeForQueueResponse::TPtr& ev);
+ void HandleWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
+ void RequestLeaderNode(TReloadStateRequestsInfoPtr info);
+ void RequestLeaderNode(const TString& reqId, const TString& user, const TString& queue);
+
private:
STATEFN(StateFunc);
void HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev);
@@ -80,6 +102,8 @@ private:
void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev);
void HandleDisconnect(ui32 nodeId);
void HandleGetLeaderNodeForQueueResponse(TSqsEvents::TEvGetLeaderNodeForQueueResponse::TPtr& ev);
+ void HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev);
+ void HandleReloadStateResponse(TSqsEvents::TEvReloadStateResponse::TPtr& ev);
private:
TIntrusivePtr<::NMonitoring::TDynamicCounters> SqsCounters_;
@@ -89,6 +113,12 @@ private:
THashMap<ui32, TNodeInfoRef> NodesInfo_;
THashMap<TString, TProxyRequestInfoRef> RequestsToProxy_;
+
+ THashMap<TString, THashMap<TString, TReloadStateRequestsInfoPtr>> ReloadStateRequestsInfo_;
+ TDeque<std::pair<TInstant, TReloadStateRequestsInfoPtr>> ReloadStatePlanningToSend_;
+ TMap<TInstant, THashSet<TReloadStateRequestsInfoPtr>> ReloadStateRequestSended_;
+ TString ReloadStateRequestId_;
+
};
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index de9277ee717..1de9b4290ab 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -134,6 +134,7 @@ STATEFN(TQueueLeader::StateInit) {
hFunc(TSqsEvents::TEvGetRuntimeQueueAttributes, HandleGetRuntimeQueueAttributesWhileIniting); // from get queue attributes action actor
hFunc(TSqsEvents::TEvDeadLetterQueueNotification, HandleDeadLetterQueueNotification); // service periodically notifies active dead letter queues
hFunc(TSqsEvents::TEvForceReloadState, HandleForceReloadState);
+ hFunc(TSqsEvents::TEvReloadStateRequest, HandleReloadStateRequest);
// internal
hFunc(TSqsEvents::TEvQueueId, HandleQueueId); // discover dlq id and version
@@ -159,6 +160,7 @@ STATEFN(TQueueLeader::StateWorking) {
hFunc(TSqsEvents::TEvGetRuntimeQueueAttributes, HandleGetRuntimeQueueAttributesWhileWorking); // from get queue attributes action actor
hFunc(TSqsEvents::TEvDeadLetterQueueNotification, HandleDeadLetterQueueNotification); // service periodically notifies active dead letter queues
hFunc(TSqsEvents::TEvForceReloadState, HandleForceReloadState);
+ hFunc(TSqsEvents::TEvReloadStateRequest, HandleReloadStateRequest);
// internal
hFunc(TSqsEvents::TEvQueueId, HandleQueueId); // discover dlq id and version
@@ -214,6 +216,7 @@ void TQueueLeader::HandleWakeup(TEvWakeup::TPtr& ev) {
break;
}
case UPDATE_MESSAGES_METRICS_TAG: {
+ CheckStillDLQ();
PlanningRetentionWakeup();
ReportOldestTimestampMetricsIfReady();
ReportMessagesCountMetricsIfReady();
@@ -230,6 +233,11 @@ void TQueueLeader::HandleWakeup(TEvWakeup::TPtr& ev) {
}
+void TQueueLeader::HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev) {
+ ReloadStateRequestedFromNodes.insert(ev->Sender.NodeId());
+ ForceReloadState();
+}
+
void TQueueLeader::HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& ev) {
if (!UseCPUOptimization) {
return;
@@ -238,11 +246,15 @@ void TQueueLeader::HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr&
if (nextTryAfter != TDuration::Max()) {
Schedule(nextTryAfter, new TSqsEvents::TEvForceReloadState(GetNextReloadStateWaitPeriod(nextTryAfter)));
}
+ ForceReloadState();
+}
- if (UpdateStateRequestInProcess) {
+void TQueueLeader::ForceReloadState() {
+ if (UpdateStateRequestStartedAt) {
LOG_SQS_DEBUG("Update state request already in process for queue " << TLogQueueName(UserName_, QueueName_));
return;
}
+ UpdateStateRequestStartedAt = TActivationContext::Now();
LOG_SQS_DEBUG("Start update state request for queue " << TLogQueueName(UserName_, QueueName_));
TExecutorBuilder(SelfId(), "")
.User(UserName_)
@@ -264,9 +276,9 @@ void TQueueLeader::HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr&
void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) {
LOG_SQS_DEBUG("Handle state for " << TLogQueueName(UserName_, QueueName_));
- Y_VERIFY(!UpdateStateRequestInProcess);
- UpdateStateRequestInProcess = false;
+ Y_VERIFY(UpdateStateRequestStartedAt != TInstant::Zero());
+ bool success = reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete;
if (reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
using NKikimr::NClient::TValue;
const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse()));
@@ -295,10 +307,24 @@ void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) {
}
shardInfo.MessagesCountWasGot = true;
+ ProcessGetRuntimeQueueAttributes(shard);
+ for (auto nodeId : ReloadStateRequestedFromNodes) {
+ Send(
+ MakeSqsProxyServiceID(nodeId),
+ new TSqsEvents::TEvReloadStateResponse(
+ UserName_,
+ QueueName_,
+ success ? UpdateStateRequestStartedAt : TInstant::Zero()
+ )
+ );
+ }
+ ReloadStateRequestedFromNodes.clear();
}
} else {
LOG_SQS_ERROR("Failed to update state for " << TLogQueueName(UserName_, QueueName_) << ": " << reply);
}
+
+ UpdateStateRequestStartedAt = TInstant::Zero();
}
void TQueueLeader::HandleGetConfigurationWhileIniting(TSqsEvents::TEvGetConfiguration::TPtr& ev) {
@@ -753,6 +779,7 @@ void TQueueLeader::OnFifoMessagesReadSuccess(const NKikimr::NClient::TValue& val
RequestOldestTimestampMetrics(0);
}
}
+ SendReloadStateRequestToDLQ();
}
reqInfo.Answer->Messages.resize(list.Size());
@@ -938,6 +965,14 @@ void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 o
}
}
+void TQueueLeader::SendReloadStateRequestToDLQ() {
+ if (DlqInfo_) {
+ Send(MakeSqsProxyServiceID(SelfId().NodeId()), new TSqsEvents::TEvReloadStateRequest(UserName_, DlqInfo_->QueueId));
+ } else {
+ LOG_SQS_ERROR("Leader for " << TLogQueueName(UserName_, QueueName_) << " don't know about dlq, but messages moved");
+ }
+}
+
void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, ui64 shard, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch) {
const NKikimr::NClient::TValue list(value["result"]);
Y_VERIFY(list.Size() == batch->Size());
@@ -946,6 +981,7 @@ void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue&
ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount);
SetMessagesCount(shard, value["newMessagesCount"]);
+ SendReloadStateRequestToDLQ();
}
THashMap<ui64, const TLoadBatchEntry*> offset2entry;
@@ -1619,7 +1655,7 @@ void TQueueLeader::HandlePurgeQueue(TSqsEvents::TEvPurgeQueue::TPtr& ev) {
Send(PurgeActor_, MakeHolder<TSqsEvents::TEvPurgeQueue>(*ev->Get()));
}
-void TQueueLeader::StartGatheringMetrics() {
+void TQueueLeader::CheckStillDLQ() {
if (!IsFifoQueue_ && (TActivationContext::Now() - LatestDlqNotificationTs_ >= TDuration::MilliSeconds(Cfg().GetDlqNotificationGracePeriodMs()))) {
if (IsDlqQueue_) {
LOG_SQS_INFO("Stopped periodic message counting for queue " << TLogQueueName(UserName_, QueueName_)
@@ -1628,7 +1664,10 @@ void TQueueLeader::StartGatheringMetrics() {
IsDlqQueue_ = false;
}
-
+}
+
+
+void TQueueLeader::StartGatheringMetrics() {
if (UseCPUOptimization) {
return;
}
@@ -2330,19 +2369,7 @@ void TQueueLeader::HandleGetRuntimeQueueAttributesWhileWorking(TSqsEvents::TEvGe
void TQueueLeader::HandleDeadLetterQueueNotification(TSqsEvents::TEvDeadLetterQueueNotification::TPtr&) {
LatestDlqNotificationTs_ = TActivationContext::Now();
-
- if (!IsDlqQueue_) {
- bool enablePeriodicMessagesCounting = UseCPUOptimization || !IsFifoQueue_; // we need to start the process only once
-
- IsDlqQueue_ = true;
- UseCPUOptimization = false;
-
- if (enablePeriodicMessagesCounting) {
- LOG_SQS_INFO("Started periodic message counting for queue " << TLogQueueName(UserName_, QueueName_)
- << ". Latest dlq notification was at " << LatestDlqNotificationTs_);
- StartGatheringMetrics();
- }
- }
+ IsDlqQueue_ = true;
}
void TQueueLeader::ProcessGetRuntimeQueueAttributes(TGetRuntimeQueueAttributesRequestProcessing& reqInfo) {
diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h
index f3a8fb7ceb0..e5f54ff3c88 100644
--- a/ydb/core/ymq/actor/queue_leader.h
+++ b/ydb/core/ymq/actor/queue_leader.h
@@ -80,7 +80,10 @@ private:
void HandleGetRuntimeQueueAttributesWhileWorking(TSqsEvents::TEvGetRuntimeQueueAttributes::TPtr& ev);
void HandleDeadLetterQueueNotification(TSqsEvents::TEvDeadLetterQueueNotification::TPtr& ev);
void HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& ev);
+ void HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev);
+ void CheckStillDLQ();
+ void ForceReloadState();
void BecomeWorking();
void RequestConfiguration();
void UpdateStateRequest();
@@ -120,6 +123,7 @@ private:
void ProcessGetRuntimeQueueAttributes(ui64 shard);
void FailGetRuntimeQueueAttributesForShard(ui64 shard);
void FailRequestsDuringStartProblems();
+ void SendReloadStateRequestToDLQ();
// send
void ProcessSendMessageBatch(TSendMessageBatchRequestProcessing& reqInfo);
@@ -437,7 +441,8 @@ private:
TMessageDelayStatistics DelayStatistics_;
TInstant RetentionWakeupPlannedAt_;
bool AskQueueAttributesInProcess_ = false;
- bool UpdateStateRequestInProcess = false;
+ TInstant UpdateStateRequestStartedAt;
+ THashSet<ui32> ReloadStateRequestedFromNodes;
bool UseCPUOptimization = false;
diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp
index 016cf0da0d4..d6d6a790a25 100644
--- a/ydb/core/ymq/actor/service.cpp
+++ b/ydb/core/ymq/actor/service.cpp
@@ -257,7 +257,7 @@ struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> {
TLocalRateLimiterResource DeleteObjectsQuoterResource_;
TLocalRateLimiterResource OtherActionsQuoterResource_;
i64 EarlyRequestQueuesListBudget_ = EARLY_REQUEST_QUEUES_LIST_MAX_BUDGET; // Defence from continuously requesting queues list.
- bool UseLeaderCPUOptimization = false;
+ bool UseLeaderCPUOptimization = true;
// State machine
THashMultiMap<TString, TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_; // queue name -> request
@@ -345,6 +345,7 @@ STATEFN(TSqsService::StateFunc) {
hFunc(TEvWakeup, HandleWakeup);
hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleDescribeSchemeResult);
hFunc(TSqsEvents::TEvExecuted, HandleExecuted);
+ hFunc(TSqsEvents::TEvReloadStateRequest, HandleReloadStateRequest);
hFunc(TSqsEvents::TEvNodeTrackerSubscriptionStatus, HandleNodeTrackingSubscriptionStatus);
hFunc(TSqsEvents::TEvGetConfiguration, HandleGetConfiguration);
hFunc(TSqsEvents::TEvSqsRequest, HandleSqsRequest);
@@ -748,6 +749,19 @@ TSqsService::TUserInfoPtr TSqsService::GetUserOrWait(TAutoPtr<TEvent>& ev) {
return userIt->second;
}
+void TSqsService::HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev) {
+ const auto userIt = Users_.find(ev->Get()->Record.GetTarget().GetUserName());
+ if (userIt != Users_.end()) {
+ auto queueIt = userIt->second->Queues_.find(ev->Get()->Record.GetTarget().GetQueueName());
+ if (queueIt != userIt->second->Queues_.end()) {
+ if (queueIt->second->LocalLeader_) {
+ Send(ev->Forward(queueIt->second->LocalLeader_));
+ return;
+ }
+ }
+ }
+}
+
void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev) {
ui64 subscriptionId = ev->Get()->SubscriptionId;
auto it = QueuePerNodeTrackingSubscription.find(subscriptionId);
diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h
index 18fe2d55d70..f06cbb70962 100644
--- a/ydb/core/ymq/actor/service.h
+++ b/ydb/core/ymq/actor/service.h
@@ -57,6 +57,7 @@ private:
void HandleInsertQueueCounters(TSqsEvents::TEvInsertQueueCounters::TPtr& ev);
void HandleUserSettingsChanged(TSqsEvents::TEvUserSettingsChanged::TPtr& ev);
void HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev);
+ void HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev);
void HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev);
void CreateNodeTrackingSubscription(TQueueInfoPtr queueInfo);
void CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo);
diff --git a/ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt b/ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt
index 5be1241c6fc..187b3709ebf 100644
--- a/ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt
@@ -18,6 +18,18 @@ get_built_tool_path(
contrib/tools/protoc/plugins/cpp_styleguide
cpp_styleguide
)
+get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
add_library(core-ymq-proto)
target_link_libraries(core-ymq-proto PUBLIC
@@ -27,6 +39,7 @@ target_link_libraries(core-ymq-proto PUBLIC
contrib-libs-protobuf
)
target_proto_messages(core-ymq-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/events.proto
${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/records.proto
)
target_proto_addincls(core-ymq-proto
diff --git a/ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt b/ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt
index da8e31b84dc..ddf57faa3fb 100644
--- a/ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt
@@ -18,6 +18,18 @@ get_built_tool_path(
contrib/tools/protoc/plugins/cpp_styleguide
cpp_styleguide
)
+get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
add_library(core-ymq-proto)
target_link_libraries(core-ymq-proto PUBLIC
@@ -28,6 +40,7 @@ target_link_libraries(core-ymq-proto PUBLIC
contrib-libs-protobuf
)
target_proto_messages(core-ymq-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/events.proto
${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/records.proto
)
target_proto_addincls(core-ymq-proto
diff --git a/ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt b/ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt
index da8e31b84dc..ddf57faa3fb 100644
--- a/ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt
@@ -18,6 +18,18 @@ get_built_tool_path(
contrib/tools/protoc/plugins/cpp_styleguide
cpp_styleguide
)
+get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
add_library(core-ymq-proto)
target_link_libraries(core-ymq-proto PUBLIC
@@ -28,6 +40,7 @@ target_link_libraries(core-ymq-proto PUBLIC
contrib-libs-protobuf
)
target_proto_messages(core-ymq-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/events.proto
${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/records.proto
)
target_proto_addincls(core-ymq-proto
diff --git a/ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt b/ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt
index 5be1241c6fc..187b3709ebf 100644
--- a/ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt
@@ -18,6 +18,18 @@ get_built_tool_path(
contrib/tools/protoc/plugins/cpp_styleguide
cpp_styleguide
)
+get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
add_library(core-ymq-proto)
target_link_libraries(core-ymq-proto PUBLIC
@@ -27,6 +39,7 @@ target_link_libraries(core-ymq-proto PUBLIC
contrib-libs-protobuf
)
target_proto_messages(core-ymq-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/events.proto
${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/records.proto
)
target_proto_addincls(core-ymq-proto
diff --git a/ydb/core/ymq/proto/events.proto b/ydb/core/ymq/proto/events.proto
new file mode 100644
index 00000000000..29e46413493
--- /dev/null
+++ b/ydb/core/ymq/proto/events.proto
@@ -0,0 +1,15 @@
+package NKikimr.NSQS;
+
+message TQueueInfo {
+ optional string UserName = 1;
+ optional string QueueName = 2;
+}
+
+message TReloadStateRequest {
+ optional TQueueInfo Target = 1;
+}
+
+message TReloadStateResponse {
+ optional TQueueInfo Who = 1;
+ optional uint64 ReloadedAtMs = 2;
+}