diff options
author | alexbogo <alexbogo@ydb.tech> | 2023-06-01 11:15:27 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2023-06-01 11:15:27 +0300 |
commit | ad69a1b560522eea94ab9632a6b7176f8e8731df (patch) | |
tree | c7515e23b9ecc8a36e69e8598cbd8c4960bae989 | |
parent | 859bb5ff4cd6798053dbbf6e8440a4b124d6678d (diff) | |
download | ydb-ad69a1b560522eea94ab9632a6b7176f8e8731df.tar.gz |
[ymq] reload DLQ state on request
init
-rw-r--r-- | ydb/core/ymq/actor/events.h | 21 | ||||
-rw-r--r-- | ydb/core/ymq/actor/proxy_service.cpp | 136 | ||||
-rw-r--r-- | ydb/core/ymq/actor/proxy_service.h | 30 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 63 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.h | 7 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.cpp | 16 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.h | 1 | ||||
-rw-r--r-- | ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt | 13 | ||||
-rw-r--r-- | ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt | 13 | ||||
-rw-r--r-- | ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt | 13 | ||||
-rw-r--r-- | ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt | 13 | ||||
-rw-r--r-- | ydb/core/ymq/proto/events.proto | 15 |
12 files changed, 319 insertions, 22 deletions
diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h index 0132e1e0d33..f500e443916 100644 --- a/ydb/core/ymq/actor/events.h +++ b/ydb/core/ymq/actor/events.h @@ -14,6 +14,7 @@ #include <ydb/core/ymq/base/processed_request_attributes.h> #include <ydb/core/ymq/base/query_id.h> #include <ydb/core/ymq/base/queue_path.h> +#include <ydb/core/ymq/proto/events.pb.h> #include <ydb/core/ymq/proto/records.pb.h> #include <library/cpp/actors/core/event_pb.h> @@ -131,6 +132,8 @@ struct TSqsEvents { EvNodeTrackerSubscriptionStatus, EvForceReloadState, + EvReloadStateRequest, + EvReloadStateResponse, EvEnd, }; @@ -945,6 +948,24 @@ struct TSqsEvents { {} TDuration NextTryAfter; }; + + struct TEvReloadStateRequest : public NActors::TEventPB<TEvReloadStateRequest, TReloadStateRequest, EvReloadStateRequest> { + TEvReloadStateRequest() = default; + + TEvReloadStateRequest(const TString& user, const TString& queue) { + Record.MutableTarget()->SetUserName(user); + Record.MutableTarget()->SetQueueName(queue); + } + }; + + struct TEvReloadStateResponse : public NActors::TEventPB<TEvReloadStateResponse, TReloadStateResponse, EvReloadStateResponse> { + TEvReloadStateResponse() = default; + TEvReloadStateResponse(const TString& user, const TString& queue, TInstant reloadedAt) { + Record.MutableWho()->SetUserName(user); + Record.MutableWho()->SetQueueName(queue); + Record.SetReloadedAtMs(reloadedAt.MilliSeconds()); + } + }; }; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/proxy_service.cpp b/ydb/core/ymq/actor/proxy_service.cpp index 4990ac55823..ea3e096e042 100644 --- a/ydb/core/ymq/actor/proxy_service.cpp +++ b/ydb/core/ymq/actor/proxy_service.cpp @@ -24,6 +24,8 @@ using NKikimr::NClient::TValue; namespace NKikimr::NSQS { +constexpr TDuration RESEND_RELOAD_REQUEST_PERIOD = TDuration::Seconds(5); + struct TSqsProxyService::TNodeInfo : public TAtomicRefCount<TNodeInfo> { explicit TNodeInfo(ui32 nodeId) : NodeId(nodeId) @@ -53,6 +55,9 @@ void TSqsProxyService::Bootstrap() { SqsCounters_ = GetSqsServiceCounters(AppData()->Counters, "core"); YmqPublicCounters_ = GetYmqPublicCounters(AppData()->Counters); + + ReloadStateRequestId_ = CreateGuidAsString(); + Send(SelfId(), new TEvents::TEvWakeup()); } void TSqsProxyService::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev) { @@ -69,8 +74,7 @@ void TSqsProxyService::HandleSqsRequest(TSqsEvents::TEvSqsRequest::TPtr& ev) { void TSqsProxyService::HandleProxySqsRequest(TSqsEvents::TEvProxySqsRequest::TPtr& ev) { TProxyRequestInfoRef request = new TProxyRequestInfo(std::move(ev)); RequestsToProxy_.emplace(request->RequestId, request); - Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvGetLeaderNodeForQueueRequest(request->RequestId, request->ProxyRequest->Get()->UserName, request->ProxyRequest->Get()->QueueName)); - RLOG_SQS_REQ_DEBUG(request->RequestId, "Send get leader node request to sqs service"); + RequestLeaderNode(request->RequestId, request->ProxyRequest->Get()->UserName, request->ProxyRequest->Get()->QueueName); } static TSqsEvents::TEvProxySqsResponse::EProxyStatus GetLeaderNodeForQueueStatusToProxyStatus(TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus status) { @@ -89,8 +93,85 @@ static TSqsEvents::TEvProxySqsResponse::EProxyStatus GetLeaderNodeForQueueStatus } } +TSqsProxyService::TReloadStateRequestsInfoPtr TSqsProxyService::GetReloadStateRequestsInfo(const TString& user, const TString& queue) { + auto userIt = ReloadStateRequestsInfo_.find(user); + if (userIt != ReloadStateRequestsInfo_.end()) { + auto queueIt = userIt->second.find(queue); + if (queueIt != userIt->second.end()) { + return queueIt->second; + } + } + return nullptr; +} + +void TSqsProxyService::RemoveReloadStateRequestsInfo(const TString& user, const TString& queue) { + auto userIt = ReloadStateRequestsInfo_.find(user); + if (userIt != ReloadStateRequestsInfo_.end()) { + userIt->second.erase(queue); + if (userIt->second.empty()) { + ReloadStateRequestsInfo_.erase(userIt); + } + } +} + +void TSqsProxyService::ScheduleReloadStateRequest(TReloadStateRequestsInfoPtr info) { + info->RequestSendedAt = TInstant::Zero(); + info->LeaderNodeRequested = false; + info->PlannedToSend = true; + ReloadStatePlanningToSend_.push_back(std::make_pair(TActivationContext::Now() + RESEND_RELOAD_REQUEST_PERIOD, info)); + +} + +void TSqsProxyService::SendReloadStateIfNeeded(TSqsEvents::TEvGetLeaderNodeForQueueResponse::TPtr& ev) { + auto info = GetReloadStateRequestsInfo(ev->Get()->UserName, ev->Get()->QueueName); + if (!info) { + return; + } + + if (info->RequestSendedAt == TInstant::Zero() && !info->PlannedToSend) { + if (ev->Get()->Status == TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::OK) { + Send(MakeSqsServiceID(ev->Get()->NodeId), new TSqsEvents::TEvReloadStateRequest(info->User, info->Queue)); + + info->RequestSendedAt = TActivationContext::Now(); + ReloadStateRequestSended_[info->RequestSendedAt].insert(info); + } else if ( + ev->Get()->Status == TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::NoUser + || ev->Get()->Status == TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::NoQueue + ) { + RemoveReloadStateRequestsInfo(ev->Get()->UserName, ev->Get()->QueueName); + } else { + ScheduleReloadStateRequest(info); + } + } +} + +void TSqsProxyService::HandleWakeup(TEvents::TEvWakeup::TPtr& /*ev*/, const TActorContext& /*ctx*/) { + auto now = TActivationContext::Now(); + while (!ReloadStatePlanningToSend_.empty() && ReloadStatePlanningToSend_.front().first <= now) { + auto info = ReloadStatePlanningToSend_.front().second; + ReloadStatePlanningToSend_.pop_front(); + + RequestLeaderNode(info); + } + auto timeoutBorder = now - RESEND_RELOAD_REQUEST_PERIOD; + while (!ReloadStateRequestSended_.empty() && ReloadStateRequestSended_.begin()->first <= timeoutBorder) { + for (auto info : ReloadStateRequestSended_.begin()->second) { + RequestLeaderNode(info); + } + ReloadStateRequestSended_.erase(ReloadStateRequestSended_.begin()); + } + + Schedule(TDuration::MilliSeconds(100), new TEvWakeup()); +} + void TSqsProxyService::HandleGetLeaderNodeForQueueResponse(TSqsEvents::TEvGetLeaderNodeForQueueResponse::TPtr& ev) { RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Got leader node for queue response. Node id: " << ev->Get()->NodeId << ". Status: " << static_cast<int>(ev->Get()->Status)); + + if (ev->Get()->RequestId == ReloadStateRequestId_) { + SendReloadStateIfNeeded(ev); + return; + } + const auto requestIt = RequestsToProxy_.find(ev->Get()->RequestId); if (requestIt == RequestsToProxy_.end()) { RLOG_SQS_REQ_ERROR(ev->Get()->RequestId, "Request was not found in requests to proxy map"); @@ -150,6 +231,54 @@ void TSqsProxyService::HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) { HandleDisconnect(nodeId); } +void TSqsProxyService::RequestLeaderNode(TReloadStateRequestsInfoPtr info) { + info->PlannedToSend = false; + info->LeaderNodeRequested = true; + info->RequestSendedAt = TInstant::Zero(); + + RequestLeaderNode(ReloadStateRequestId_, info->User, info->Queue); +} + +void TSqsProxyService::RequestLeaderNode(const TString& reqId, const TString& user, const TString& queue) { + Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvGetLeaderNodeForQueueRequest(reqId, user, queue)); + RLOG_SQS_REQ_DEBUG(reqId, "Send get leader node request to sqs service for " << user << "/" << queue); +} + +void TSqsProxyService::HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev) { + const auto& target = ev->Get()->Record.GetTarget(); + + auto info = GetReloadStateRequestsInfo(target.GetUserName(), target.GetQueueName()); + if (!info) { + info = new TReloadStateRequestsInfo(); + info->User = target.GetUserName(); + info->Queue = target.GetQueueName(); + ReloadStateRequestsInfo_[target.GetUserName()][target.GetQueueName()] = info; + } + + info->ReloadStateBorder = TActivationContext::Now(); + if (info->RequestSendedAt == TInstant::Zero() && !info->LeaderNodeRequested) { + RequestLeaderNode(ReloadStateRequestId_, target.GetUserName(), target.GetQueueName()); + } +} + +void TSqsProxyService::HandleReloadStateResponse(TSqsEvents::TEvReloadStateResponse::TPtr& ev) { + const auto& record = ev->Get()->Record; + const auto& who = record.GetWho(); + auto info = GetReloadStateRequestsInfo(who.GetUserName(), who.GetQueueName()); + + if (!info) { + return; + } + ReloadStateRequestSended_[info->RequestSendedAt].erase(info); + TInstant reloadedAt = TInstant::MilliSeconds(record.GetReloadedAtMs()); + if (reloadedAt < info->ReloadStateBorder) { + ScheduleReloadStateRequest(info); + } else { + RemoveReloadStateRequestsInfo(who.GetUserName(), who.GetQueueName()); + } +} + + STATEFN(TSqsProxyService::StateFunc) { switch (ev->GetTypeRewrite()) { hFunc(TSqsEvents::TEvExecuted, HandleExecuted); @@ -160,6 +289,9 @@ STATEFN(TSqsProxyService::StateFunc) { hFunc(TEvInterconnect::TEvNodeConnected, HandleConnect); hFunc(TEvents::TEvUndelivered, HandleUndelivered); hFunc(TSqsEvents::TEvGetLeaderNodeForQueueResponse, HandleGetLeaderNodeForQueueResponse); + hFunc(TSqsEvents::TEvReloadStateRequest, HandleReloadStateRequest); + hFunc(TSqsEvents::TEvReloadStateResponse, HandleReloadStateResponse); + HFunc(TEvents::TEvWakeup, HandleWakeup); default: LOG_SQS_ERROR("Unknown type of event came to SQS service actor: " << ev->Type << " (" << ev->GetTypeName() << "), sender: " << ev->Sender); } diff --git a/ydb/core/ymq/actor/proxy_service.h b/ydb/core/ymq/actor/proxy_service.h index e3fc2c82903..4bbb72d497e 100644 --- a/ydb/core/ymq/actor/proxy_service.h +++ b/ydb/core/ymq/actor/proxy_service.h @@ -47,6 +47,19 @@ struct TReplierToSenderActorCallback : public IReplyCallback { class TSqsProxyService : public TActorBootstrapped<TSqsProxyService> { +private: + struct TReloadStateRequestsInfo : public TAtomicRefCount<TReloadStateRequestsInfo> { + TString User; + TString Queue; + + TInstant RequestSendedAt; + TInstant ReloadStateBorder; + bool PlannedToSend = false; + bool LeaderNodeRequested = false; + }; + + using TReloadStateRequestsInfoPtr = TIntrusivePtr<TReloadStateRequestsInfo>; + public: struct TNodeInfo; using TNodeInfoRef = TIntrusivePtr<TNodeInfo>; @@ -69,6 +82,15 @@ private: void SendProxyError(TProxyRequestInfoRef request, TSqsEvents::TEvProxySqsResponse::EProxyStatus proxyStatus); void SendProxyErrors(TNodeInfo& nodeInfo, TSqsEvents::TEvProxySqsResponse::EProxyStatus proxyStatus); + + TReloadStateRequestsInfoPtr GetReloadStateRequestsInfo(const TString& user, const TString& queue); + void RemoveReloadStateRequestsInfo(const TString& user, const TString& queue); + void ScheduleReloadStateRequest(TReloadStateRequestsInfoPtr info); + void SendReloadStateIfNeeded(TSqsEvents::TEvGetLeaderNodeForQueueResponse::TPtr& ev); + void HandleWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); + void RequestLeaderNode(TReloadStateRequestsInfoPtr info); + void RequestLeaderNode(const TString& reqId, const TString& user, const TString& queue); + private: STATEFN(StateFunc); void HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev); @@ -80,6 +102,8 @@ private: void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev); void HandleDisconnect(ui32 nodeId); void HandleGetLeaderNodeForQueueResponse(TSqsEvents::TEvGetLeaderNodeForQueueResponse::TPtr& ev); + void HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev); + void HandleReloadStateResponse(TSqsEvents::TEvReloadStateResponse::TPtr& ev); private: TIntrusivePtr<::NMonitoring::TDynamicCounters> SqsCounters_; @@ -89,6 +113,12 @@ private: THashMap<ui32, TNodeInfoRef> NodesInfo_; THashMap<TString, TProxyRequestInfoRef> RequestsToProxy_; + + THashMap<TString, THashMap<TString, TReloadStateRequestsInfoPtr>> ReloadStateRequestsInfo_; + TDeque<std::pair<TInstant, TReloadStateRequestsInfoPtr>> ReloadStatePlanningToSend_; + TMap<TInstant, THashSet<TReloadStateRequestsInfoPtr>> ReloadStateRequestSended_; + TString ReloadStateRequestId_; + }; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index de9277ee717..1de9b4290ab 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -134,6 +134,7 @@ STATEFN(TQueueLeader::StateInit) { hFunc(TSqsEvents::TEvGetRuntimeQueueAttributes, HandleGetRuntimeQueueAttributesWhileIniting); // from get queue attributes action actor hFunc(TSqsEvents::TEvDeadLetterQueueNotification, HandleDeadLetterQueueNotification); // service periodically notifies active dead letter queues hFunc(TSqsEvents::TEvForceReloadState, HandleForceReloadState); + hFunc(TSqsEvents::TEvReloadStateRequest, HandleReloadStateRequest); // internal hFunc(TSqsEvents::TEvQueueId, HandleQueueId); // discover dlq id and version @@ -159,6 +160,7 @@ STATEFN(TQueueLeader::StateWorking) { hFunc(TSqsEvents::TEvGetRuntimeQueueAttributes, HandleGetRuntimeQueueAttributesWhileWorking); // from get queue attributes action actor hFunc(TSqsEvents::TEvDeadLetterQueueNotification, HandleDeadLetterQueueNotification); // service periodically notifies active dead letter queues hFunc(TSqsEvents::TEvForceReloadState, HandleForceReloadState); + hFunc(TSqsEvents::TEvReloadStateRequest, HandleReloadStateRequest); // internal hFunc(TSqsEvents::TEvQueueId, HandleQueueId); // discover dlq id and version @@ -214,6 +216,7 @@ void TQueueLeader::HandleWakeup(TEvWakeup::TPtr& ev) { break; } case UPDATE_MESSAGES_METRICS_TAG: { + CheckStillDLQ(); PlanningRetentionWakeup(); ReportOldestTimestampMetricsIfReady(); ReportMessagesCountMetricsIfReady(); @@ -230,6 +233,11 @@ void TQueueLeader::HandleWakeup(TEvWakeup::TPtr& ev) { } +void TQueueLeader::HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev) { + ReloadStateRequestedFromNodes.insert(ev->Sender.NodeId()); + ForceReloadState(); +} + void TQueueLeader::HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& ev) { if (!UseCPUOptimization) { return; @@ -238,11 +246,15 @@ void TQueueLeader::HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& if (nextTryAfter != TDuration::Max()) { Schedule(nextTryAfter, new TSqsEvents::TEvForceReloadState(GetNextReloadStateWaitPeriod(nextTryAfter))); } + ForceReloadState(); +} - if (UpdateStateRequestInProcess) { +void TQueueLeader::ForceReloadState() { + if (UpdateStateRequestStartedAt) { LOG_SQS_DEBUG("Update state request already in process for queue " << TLogQueueName(UserName_, QueueName_)); return; } + UpdateStateRequestStartedAt = TActivationContext::Now(); LOG_SQS_DEBUG("Start update state request for queue " << TLogQueueName(UserName_, QueueName_)); TExecutorBuilder(SelfId(), "") .User(UserName_) @@ -264,9 +276,9 @@ void TQueueLeader::HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) { LOG_SQS_DEBUG("Handle state for " << TLogQueueName(UserName_, QueueName_)); - Y_VERIFY(!UpdateStateRequestInProcess); - UpdateStateRequestInProcess = false; + Y_VERIFY(UpdateStateRequestStartedAt != TInstant::Zero()); + bool success = reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete; if (reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { using NKikimr::NClient::TValue; const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse())); @@ -295,10 +307,24 @@ void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) { } shardInfo.MessagesCountWasGot = true; + ProcessGetRuntimeQueueAttributes(shard); + for (auto nodeId : ReloadStateRequestedFromNodes) { + Send( + MakeSqsProxyServiceID(nodeId), + new TSqsEvents::TEvReloadStateResponse( + UserName_, + QueueName_, + success ? UpdateStateRequestStartedAt : TInstant::Zero() + ) + ); + } + ReloadStateRequestedFromNodes.clear(); } } else { LOG_SQS_ERROR("Failed to update state for " << TLogQueueName(UserName_, QueueName_) << ": " << reply); } + + UpdateStateRequestStartedAt = TInstant::Zero(); } void TQueueLeader::HandleGetConfigurationWhileIniting(TSqsEvents::TEvGetConfiguration::TPtr& ev) { @@ -753,6 +779,7 @@ void TQueueLeader::OnFifoMessagesReadSuccess(const NKikimr::NClient::TValue& val RequestOldestTimestampMetrics(0); } } + SendReloadStateRequestToDLQ(); } reqInfo.Answer->Messages.resize(list.Size()); @@ -938,6 +965,14 @@ void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 o } } +void TQueueLeader::SendReloadStateRequestToDLQ() { + if (DlqInfo_) { + Send(MakeSqsProxyServiceID(SelfId().NodeId()), new TSqsEvents::TEvReloadStateRequest(UserName_, DlqInfo_->QueueId)); + } else { + LOG_SQS_ERROR("Leader for " << TLogQueueName(UserName_, QueueName_) << " don't know about dlq, but messages moved"); + } +} + void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, ui64 shard, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch) { const NKikimr::NClient::TValue list(value["result"]); Y_VERIFY(list.Size() == batch->Size()); @@ -946,6 +981,7 @@ void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount); SetMessagesCount(shard, value["newMessagesCount"]); + SendReloadStateRequestToDLQ(); } THashMap<ui64, const TLoadBatchEntry*> offset2entry; @@ -1619,7 +1655,7 @@ void TQueueLeader::HandlePurgeQueue(TSqsEvents::TEvPurgeQueue::TPtr& ev) { Send(PurgeActor_, MakeHolder<TSqsEvents::TEvPurgeQueue>(*ev->Get())); } -void TQueueLeader::StartGatheringMetrics() { +void TQueueLeader::CheckStillDLQ() { if (!IsFifoQueue_ && (TActivationContext::Now() - LatestDlqNotificationTs_ >= TDuration::MilliSeconds(Cfg().GetDlqNotificationGracePeriodMs()))) { if (IsDlqQueue_) { LOG_SQS_INFO("Stopped periodic message counting for queue " << TLogQueueName(UserName_, QueueName_) @@ -1628,7 +1664,10 @@ void TQueueLeader::StartGatheringMetrics() { IsDlqQueue_ = false; } - +} + + +void TQueueLeader::StartGatheringMetrics() { if (UseCPUOptimization) { return; } @@ -2330,19 +2369,7 @@ void TQueueLeader::HandleGetRuntimeQueueAttributesWhileWorking(TSqsEvents::TEvGe void TQueueLeader::HandleDeadLetterQueueNotification(TSqsEvents::TEvDeadLetterQueueNotification::TPtr&) { LatestDlqNotificationTs_ = TActivationContext::Now(); - - if (!IsDlqQueue_) { - bool enablePeriodicMessagesCounting = UseCPUOptimization || !IsFifoQueue_; // we need to start the process only once - - IsDlqQueue_ = true; - UseCPUOptimization = false; - - if (enablePeriodicMessagesCounting) { - LOG_SQS_INFO("Started periodic message counting for queue " << TLogQueueName(UserName_, QueueName_) - << ". Latest dlq notification was at " << LatestDlqNotificationTs_); - StartGatheringMetrics(); - } - } + IsDlqQueue_ = true; } void TQueueLeader::ProcessGetRuntimeQueueAttributes(TGetRuntimeQueueAttributesRequestProcessing& reqInfo) { diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h index f3a8fb7ceb0..e5f54ff3c88 100644 --- a/ydb/core/ymq/actor/queue_leader.h +++ b/ydb/core/ymq/actor/queue_leader.h @@ -80,7 +80,10 @@ private: void HandleGetRuntimeQueueAttributesWhileWorking(TSqsEvents::TEvGetRuntimeQueueAttributes::TPtr& ev); void HandleDeadLetterQueueNotification(TSqsEvents::TEvDeadLetterQueueNotification::TPtr& ev); void HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& ev); + void HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev); + void CheckStillDLQ(); + void ForceReloadState(); void BecomeWorking(); void RequestConfiguration(); void UpdateStateRequest(); @@ -120,6 +123,7 @@ private: void ProcessGetRuntimeQueueAttributes(ui64 shard); void FailGetRuntimeQueueAttributesForShard(ui64 shard); void FailRequestsDuringStartProblems(); + void SendReloadStateRequestToDLQ(); // send void ProcessSendMessageBatch(TSendMessageBatchRequestProcessing& reqInfo); @@ -437,7 +441,8 @@ private: TMessageDelayStatistics DelayStatistics_; TInstant RetentionWakeupPlannedAt_; bool AskQueueAttributesInProcess_ = false; - bool UpdateStateRequestInProcess = false; + TInstant UpdateStateRequestStartedAt; + THashSet<ui32> ReloadStateRequestedFromNodes; bool UseCPUOptimization = false; diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index 016cf0da0d4..d6d6a790a25 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -257,7 +257,7 @@ struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> { TLocalRateLimiterResource DeleteObjectsQuoterResource_; TLocalRateLimiterResource OtherActionsQuoterResource_; i64 EarlyRequestQueuesListBudget_ = EARLY_REQUEST_QUEUES_LIST_MAX_BUDGET; // Defence from continuously requesting queues list. - bool UseLeaderCPUOptimization = false; + bool UseLeaderCPUOptimization = true; // State machine THashMultiMap<TString, TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_; // queue name -> request @@ -345,6 +345,7 @@ STATEFN(TSqsService::StateFunc) { hFunc(TEvWakeup, HandleWakeup); hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleDescribeSchemeResult); hFunc(TSqsEvents::TEvExecuted, HandleExecuted); + hFunc(TSqsEvents::TEvReloadStateRequest, HandleReloadStateRequest); hFunc(TSqsEvents::TEvNodeTrackerSubscriptionStatus, HandleNodeTrackingSubscriptionStatus); hFunc(TSqsEvents::TEvGetConfiguration, HandleGetConfiguration); hFunc(TSqsEvents::TEvSqsRequest, HandleSqsRequest); @@ -748,6 +749,19 @@ TSqsService::TUserInfoPtr TSqsService::GetUserOrWait(TAutoPtr<TEvent>& ev) { return userIt->second; } +void TSqsService::HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev) { + const auto userIt = Users_.find(ev->Get()->Record.GetTarget().GetUserName()); + if (userIt != Users_.end()) { + auto queueIt = userIt->second->Queues_.find(ev->Get()->Record.GetTarget().GetQueueName()); + if (queueIt != userIt->second->Queues_.end()) { + if (queueIt->second->LocalLeader_) { + Send(ev->Forward(queueIt->second->LocalLeader_)); + return; + } + } + } +} + void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev) { ui64 subscriptionId = ev->Get()->SubscriptionId; auto it = QueuePerNodeTrackingSubscription.find(subscriptionId); diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h index 18fe2d55d70..f06cbb70962 100644 --- a/ydb/core/ymq/actor/service.h +++ b/ydb/core/ymq/actor/service.h @@ -57,6 +57,7 @@ private: void HandleInsertQueueCounters(TSqsEvents::TEvInsertQueueCounters::TPtr& ev); void HandleUserSettingsChanged(TSqsEvents::TEvUserSettingsChanged::TPtr& ev); void HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev); + void HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev); void HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev); void CreateNodeTrackingSubscription(TQueueInfoPtr queueInfo); void CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo); diff --git a/ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt b/ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt index 5be1241c6fc..187b3709ebf 100644 --- a/ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/ymq/proto/CMakeLists.darwin-x86_64.txt @@ -18,6 +18,18 @@ get_built_tool_path( contrib/tools/protoc/plugins/cpp_styleguide cpp_styleguide ) +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) add_library(core-ymq-proto) target_link_libraries(core-ymq-proto PUBLIC @@ -27,6 +39,7 @@ target_link_libraries(core-ymq-proto PUBLIC contrib-libs-protobuf ) target_proto_messages(core-ymq-proto PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/events.proto ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/records.proto ) target_proto_addincls(core-ymq-proto diff --git a/ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt b/ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt index da8e31b84dc..ddf57faa3fb 100644 --- a/ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt +++ b/ydb/core/ymq/proto/CMakeLists.linux-aarch64.txt @@ -18,6 +18,18 @@ get_built_tool_path( contrib/tools/protoc/plugins/cpp_styleguide cpp_styleguide ) +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) add_library(core-ymq-proto) target_link_libraries(core-ymq-proto PUBLIC @@ -28,6 +40,7 @@ target_link_libraries(core-ymq-proto PUBLIC contrib-libs-protobuf ) target_proto_messages(core-ymq-proto PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/events.proto ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/records.proto ) target_proto_addincls(core-ymq-proto diff --git a/ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt b/ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt index da8e31b84dc..ddf57faa3fb 100644 --- a/ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt +++ b/ydb/core/ymq/proto/CMakeLists.linux-x86_64.txt @@ -18,6 +18,18 @@ get_built_tool_path( contrib/tools/protoc/plugins/cpp_styleguide cpp_styleguide ) +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) add_library(core-ymq-proto) target_link_libraries(core-ymq-proto PUBLIC @@ -28,6 +40,7 @@ target_link_libraries(core-ymq-proto PUBLIC contrib-libs-protobuf ) target_proto_messages(core-ymq-proto PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/events.proto ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/records.proto ) target_proto_addincls(core-ymq-proto diff --git a/ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt b/ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt index 5be1241c6fc..187b3709ebf 100644 --- a/ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt +++ b/ydb/core/ymq/proto/CMakeLists.windows-x86_64.txt @@ -18,6 +18,18 @@ get_built_tool_path( contrib/tools/protoc/plugins/cpp_styleguide cpp_styleguide ) +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) add_library(core-ymq-proto) target_link_libraries(core-ymq-proto PUBLIC @@ -27,6 +39,7 @@ target_link_libraries(core-ymq-proto PUBLIC contrib-libs-protobuf ) target_proto_messages(core-ymq-proto PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/events.proto ${CMAKE_SOURCE_DIR}/ydb/core/ymq/proto/records.proto ) target_proto_addincls(core-ymq-proto diff --git a/ydb/core/ymq/proto/events.proto b/ydb/core/ymq/proto/events.proto new file mode 100644 index 00000000000..29e46413493 --- /dev/null +++ b/ydb/core/ymq/proto/events.proto @@ -0,0 +1,15 @@ +package NKikimr.NSQS;
+
+message TQueueInfo {
+ optional string UserName = 1;
+ optional string QueueName = 2;
+}
+
+message TReloadStateRequest {
+ optional TQueueInfo Target = 1;
+}
+
+message TReloadStateResponse {
+ optional TQueueInfo Who = 1;
+ optional uint64 ReloadedAtMs = 2;
+}
|