diff options
author | alexbogo <alexbogo@ydb.tech> | 2023-06-02 16:37:41 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2023-06-02 16:37:41 +0300 |
commit | a222577a9fdf332653f853e10b061c51a7f3a76c (patch) | |
tree | 9b80fdcca8e6650eeda9c0057e3a0212254159d9 | |
parent | a080f3314e29c6dcae3c36f3198b7aa36edb6cfd (diff) | |
download | ydb-a222577a9fdf332653f853e10b061c51a7f3a76c.tar.gz |
[ymq] limit inflight of starting local leaders
init
init
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/ymq/actor/events.h | 4 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 9 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.cpp | 201 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.h | 4 | ||||
-rw-r--r-- | ydb/core/ymq/base/counters.cpp | 6 | ||||
-rw-r--r-- | ydb/core/ymq/base/counters.h | 4 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/large/__init__.py | 2 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/large/test_leader_start_inflight.py | 74 |
9 files changed, 257 insertions, 48 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index f23b4750c8..65118fc7ff 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1020,6 +1020,7 @@ message TSqsConfig { optional TYcSearchEventsConfig YcSearchEventsConfig = 66; optional TYdbAuthConfig AuthConfig = 67; + optional uint64 StartLocalLeaderInflightMax = 71 [default = 500]; } message TConfigsDispatcherConfig { diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h index f500e44391..6c856bd37e 100644 --- a/ydb/core/ymq/actor/events.h +++ b/ydb/core/ymq/actor/events.h @@ -134,6 +134,7 @@ struct TSqsEvents { EvForceReloadState, EvReloadStateRequest, EvReloadStateResponse, + EvLeaderStarted, EvEnd, }; @@ -966,6 +967,9 @@ struct TSqsEvents { Record.SetReloadedAtMs(reloadedAt.MilliSeconds()); } }; + + struct TEvLeaderStarted : public NActors::TEventLocal<TEvLeaderStarted, EvLeaderStarted> { + }; }; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index 1de9b4290a..d04a0960a9 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -117,6 +117,8 @@ void TQueueLeader::BecomeWorking() { for (auto&& [reqIdAndShard, reqInfo] : ChangeMessageVisibilityRequests_) { ProcessChangeMessageVisibilityBatch(reqInfo); } + + Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvLeaderStarted()); } STATEFN(TQueueLeader::StateInit) { @@ -176,6 +178,10 @@ STATEFN(TQueueLeader::StateWorking) { void TQueueLeader::PassAway() { LOG_SQS_INFO("Queue " << TLogQueueName(UserName_, QueueName_) << " leader is dying"); + if (CurrentStateFunc() != &TThis::StateWorking) { + Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvLeaderStarted()); + } + for (auto& req : GetConfigurationRequests_) { AnswerFailed(req); } @@ -278,7 +284,6 @@ void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) { LOG_SQS_DEBUG("Handle state for " << TLogQueueName(UserName_, QueueName_)); Y_VERIFY(UpdateStateRequestStartedAt != TInstant::Zero()); - bool success = reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete; if (reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { using NKikimr::NClient::TValue; const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse())); @@ -314,7 +319,7 @@ void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) { new TSqsEvents::TEvReloadStateResponse( UserName_, QueueName_, - success ? UpdateStateRequestStartedAt : TInstant::Zero() + UpdateStateRequestStartedAt ) ); } diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index d6d6a790a2..3b8dc95a3f 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -101,17 +101,16 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { bool LeaderMustBeOnCurrentNode() const { return LeaderNodeId_ && LeaderNodeId_.value() == SelfId().NodeId(); } + bool NeedStartLocalLeader() const { + return !LocalLeader_ && (LocalLeaderRefCount_ > 0 || LeaderMustBeOnCurrentNode()); + } + + bool NeedStopLocalLeader() const { + return LocalLeader_ && LocalLeaderRefCount_ == 0 && !LeaderMustBeOnCurrentNode(); + } void SetLeaderNodeId(ui32 nodeId) { - if (LeaderNodeId_ && LeaderNodeId_ == nodeId) { - return; - } LeaderNodeId_ = nodeId; - if (LeaderMustBeOnCurrentNode()) { - StartLocalLeader(LEADER_CREATE_REASON_LOCAL_TABLET); - } else { - StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE); - } } void LocalLeaderWayMoved() const { @@ -121,54 +120,50 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { } void StartLocalLeader(const TString& reason) { - if (!LocalLeader_) { - Counters_ = Counters_->GetCountersForLeaderNode(); - LWPROBE(CreateLeader, UserName_, QueueName_, reason); - LocalLeader_ = TActivationContext::Register(new TQueueLeader( - UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_, - SchemeCache_, QuoterResourcesForUser_, UseLeaderCPUOptimization - )); - LOG_SQS_INFO("Start local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_); - - if (FolderId_) { - Y_VERIFY(FolderCounters_); - FolderCounters_->InitCounters(); - INC_COUNTER(FolderCounters_, total_count); - } + Y_VERIFY(!LocalLeader_); + Counters_ = Counters_->GetCountersForLeaderNode(); + LWPROBE(CreateLeader, UserName_, QueueName_, reason); + LocalLeader_ = TActivationContext::Register(new TQueueLeader( + UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_, + SchemeCache_, QuoterResourcesForUser_, UseLeaderCPUOptimization + )); + LOG_SQS_INFO("Start local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_); + + if (FolderId_) { + Y_VERIFY(FolderCounters_); + FolderCounters_->InitCounters(); + INC_COUNTER(FolderCounters_, total_count); } - } - void StopLocalLeaderIfNeeded(const TString& reason) { - if (!LeaderMustBeOnCurrentNode() && LocalLeaderRefCount_ == 0) { - StopLocalLeader(reason); + for (auto ev : GetConfigurationRequests_) { + TActivationContext::Send(ev->Forward(LocalLeader_)); } + GetConfigurationRequests_.clear(); } void StopLocalLeader(const TString& reason) { - if (LocalLeader_) { - Counters_ = Counters_->GetCountersForNotLeaderNode(); - LWPROBE(DestroyLeader, UserName_, QueueName_, reason); - LOG_SQS_INFO("Stop local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_); - TActivationContext::Send(new IEventHandle(LocalLeader_, SelfId(), new TEvPoisonPill())); - LocalLeader_ = TActorId(); - if (FolderId_) { - Y_VERIFY(FolderCounters_); - DEC_COUNTER(FolderCounters_, total_count); - } + Y_VERIFY(LocalLeader_); + Counters_ = Counters_->GetCountersForNotLeaderNode(); + LWPROBE(DestroyLeader, UserName_, QueueName_, reason); + LOG_SQS_INFO("Stop local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_); + TActivationContext::Send(new IEventHandle(LocalLeader_, SelfId(), new TEvPoisonPill())); + LocalLeader_ = TActorId(); + if (FolderId_) { + Y_VERIFY(FolderCounters_); + DEC_COUNTER(FolderCounters_, total_count); } } - void IncLocalLeaderRef(const TString& reason) { - StartLocalLeader(reason); + void IncLocalLeaderRef() { ++LocalLeaderRefCount_; } - void DecLocalLeaderRef(const TString& reason) { + void DecLocalLeaderRef() { Y_VERIFY(LocalLeaderRefCount_ > 0); --LocalLeaderRefCount_; - StopLocalLeaderIfNeeded(reason); } + TActorIdentity SelfId() const { return TActorIdentity(TActivationContext::AsActorContext().SelfID); } @@ -197,9 +192,112 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { // State machine THashSet<TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_; + TVector<TSqsEvents::TEvGetConfiguration::TPtr> GetConfigurationRequests_; TInstant NodeUnknownSince_ = TInstant::Now(); + }; +class TSqsService::TLocalLeaderManager { +public: + TLocalLeaderManager(TIntrusivePtr<TMonitoringCounters> counters) + : MaxInflight(Cfg().GetStartLocalLeaderInflightMax()) + , Counters(counters) + { + } + void QueueRemoved(TQueueInfoPtr queue); + void SetLeaderNodeId(TQueueInfoPtr queue, ui32 nodeId, TInstant now); + void IncLocalLeaderRef(TQueueInfoPtr queue, const TString& reason, TInstant now); + void DecLocalLeaderRef(TQueueInfoPtr queue, const TString& reason); + void LocalLeaderStarted(TInstant now); +private: + struct TAwaitingQueueInfo { + TAwaitingQueueInfo(TQueueInfoPtr queue, TInstant since, const TString& reason) + : Queue(queue) + , Since(since) + , Reason(reason) + {} + TQueueInfoPtr Queue; + TInstant Since; + const TString& Reason; + }; +private: + void TryStartLocalLeader(TQueueInfoPtr queue, const TString& reason, TInstant waitSince, TInstant now); + void ProcessAwaiting(TInstant now); + +private: + const ui64 MaxInflight; + ui64 Inflight = 0; + TDeque<TAwaitingQueueInfo> Awaiting; + THashSet<TQueueInfoPtr> AlreadyAwaiting; + TIntrusivePtr<TMonitoringCounters> Counters; +}; + + +void TSqsService::TLocalLeaderManager::QueueRemoved(TQueueInfoPtr queue) { + queue->LeaderNodeId_.reset(); + if (queue->NeedStopLocalLeader()) { + queue->StopLocalLeader(LEADER_DESTROY_REASON_REMOVE_INFO); + } +} + +void TSqsService::TLocalLeaderManager::SetLeaderNodeId(TQueueInfoPtr queue, ui32 nodeId, TInstant now) { + queue->SetLeaderNodeId(nodeId); + if (queue->NeedStartLocalLeader()) { + TryStartLocalLeader(queue, LEADER_CREATE_REASON_LOCAL_TABLET, now, now); + } else if (queue->NeedStopLocalLeader()) { + queue->StopLocalLeader(LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE); + } +} + +void TSqsService::TLocalLeaderManager::IncLocalLeaderRef(TQueueInfoPtr queue, const TString& reason, TInstant now) { + queue->IncLocalLeaderRef(); + TryStartLocalLeader(queue, reason, now, now); +} + +void TSqsService::TLocalLeaderManager::TryStartLocalLeader(TQueueInfoPtr queue, const TString& reason, TInstant waitSince, TInstant now) { + if (queue->NeedStartLocalLeader()) { + if (MaxInflight != 0 && Inflight >= MaxInflight) { + if (!AlreadyAwaiting.count(queue)) { + LOG_SQS_DEBUG("Queue [" << queue->UserName_ << "/" << queue->QueueName_ << "] is waiting for the leader to start, inflight=" << Inflight); + Awaiting.emplace_back(queue, waitSince, reason); + AlreadyAwaiting.insert(queue); + } + } else { + ++Inflight; + queue->StartLocalLeader(reason); + Counters->LocalLeaderStartAwaitMs->Collect((now - waitSince).MilliSeconds()); + } + } + *Counters->LocalLeaderStartInflight = Inflight; + *Counters->LocalLeaderStartQueue = Awaiting.size(); +} + +void TSqsService::TLocalLeaderManager::DecLocalLeaderRef(TQueueInfoPtr queue, const TString& reason) { + queue->DecLocalLeaderRef(); + if (queue->NeedStopLocalLeader()) { + queue->StopLocalLeader(reason); + } +} + +void TSqsService::TLocalLeaderManager::LocalLeaderStarted(TInstant now) { + Y_VERIFY(Inflight > 0); + --Inflight; + + ProcessAwaiting(now); + + *Counters->LocalLeaderStartInflight = Inflight; + *Counters->LocalLeaderStartQueue = Awaiting.size(); +} + +void TSqsService::TLocalLeaderManager::ProcessAwaiting(TInstant now) { + while (!Awaiting.empty() && (MaxInflight == 0 || Inflight < MaxInflight)) { + auto info = Awaiting.front(); + Awaiting.pop_front(); + AlreadyAwaiting.erase(info.Queue); + TryStartLocalLeader(info.Queue, info.Reason, info.Since, now); + } +} + struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> { TUserInfo(TString userName, TIntrusivePtr<TUserCounters> userCounters) : UserName_(std::move(userName)) @@ -302,6 +400,8 @@ void TSqsService::Bootstrap() { InitSchemeCache(); NodeTrackerActor_ = Register(new TNodeTrackerActor(SchemeCache_)); + LocalLeaderManager = MakeHolder<TLocalLeaderManager>(MonitoringCounters_); + Register(new TCleanupQueueDataActor(MonitoringCounters_)); Register(new TMonitoringActor(MonitoringCounters_)); @@ -351,6 +451,8 @@ STATEFN(TSqsService::StateFunc) { hFunc(TSqsEvents::TEvSqsRequest, HandleSqsRequest); hFunc(TSqsEvents::TEvInsertQueueCounters, HandleInsertQueueCounters); hFunc(TSqsEvents::TEvUserSettingsChanged, HandleUserSettingsChanged); + hFunc(TSqsEvents::TEvLeaderStarted, HandleLeaderStarted); + hFunc(TSqsEvents::TEvQueuesList, HandleQueuesList); default: LOG_SQS_ERROR("Unknown type of event came to SQS service actor: " << ev->Type << " (" << ev->GetTypeName() << "), sender: " << ev->Sender); @@ -595,7 +697,11 @@ void TSqsService::ProcessConfigurationRequestForQueue(TSqsEvents::TEvGetConfigur if (ev->Get()->Flags & TSqsEvents::TEvGetConfiguration::EFlags::NeedQueueLeader) { IncLocalLeaderRef(ev->Sender, queueInfo, LEADER_CREATE_REASON_USER_REQUEST); RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Forward configuration request to queue [" << queueInfo->UserName_ << "/" << queueInfo->QueueName_ << "] leader"); - TActivationContext::Send(ev->Forward(queueInfo->LocalLeader_)); + if (queueInfo->LocalLeader_) { + TActivationContext::Send(ev->Forward(queueInfo->LocalLeader_)); + } else { + queueInfo->GetConfigurationRequests_.emplace_back(std::move(ev)); + } } else { RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Answer configuration for queue [" << queueInfo->UserName_ << "/" << queueInfo->QueueName_ << "] without leader"); AnswerLeaderlessConfiguration(ev, userInfo, queueInfo); @@ -774,7 +880,7 @@ void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTracke auto& queue = *queuePtr; auto nodeId = ev->Get()->NodeId; bool disconnected = ev->Get()->Disconnected; - queue.SetLeaderNodeId(nodeId); + LocalLeaderManager->SetLeaderNodeId(queuePtr, nodeId, TActivationContext::Now()); if (disconnected) { queue.LocalLeaderWayMoved(); } @@ -790,6 +896,10 @@ void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTracke QueuesWithGetNodeWaitingRequests.erase(queuePtr); } +void TSqsService::HandleLeaderStarted(TSqsEvents::TEvLeaderStarted::TPtr&) { + LocalLeaderManager->LocalLeaderStarted(TActivationContext::Now()); +} + void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) { RequestingQueuesList_ = false; LastRequestQueuesListTime_ = TActivationContext::Now(); @@ -1168,8 +1278,7 @@ void TSqsService::CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo) { queueInfo->NodeTrackingSubscriptionId = 0; QueuePerNodeTrackingSubscription.erase(id); - queueInfo->LeaderNodeId_.reset(); - queueInfo->StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_REMOVE_INFO); + LocalLeaderManager->QueueRemoved(queueInfo); Send( NodeTrackerActor_, @@ -1299,7 +1408,7 @@ void TSqsService::IncLocalLeaderRef(const TActorId& referer, const TQueueInfoPtr const auto [iter, inserted] = LocalLeaderRefs_.emplace(referer, queueInfo); if (inserted) { LOG_SQS_TRACE("Inc local leader ref for actor " << referer); - queueInfo->IncLocalLeaderRef(reason); + LocalLeaderManager->IncLocalLeaderRef(queueInfo, reason, TActivationContext::Now()); } else { LWPROBE(IncLeaderRefAlreadyHasRef, queueInfo->UserName_, queueInfo->QueueName_, referer.ToString()); LOG_SQS_WARN("Inc local leader ref for actor " << referer << ". Ignore because this actor already presents in referers set"); @@ -1312,7 +1421,7 @@ void TSqsService::DecLocalLeaderRef(const TActorId& referer, const TString& reas LOG_SQS_TRACE("Dec local leader ref for actor " << referer << ". Found: " << (iter != LocalLeaderRefs_.end())); if (iter != LocalLeaderRefs_.end()) { auto queueInfo = iter->second; - queueInfo->DecLocalLeaderRef(reason); + LocalLeaderManager->DecLocalLeaderRef(queueInfo, reason); LocalLeaderRefs_.erase(iter); } else { LWPROBE(DecLeaderRefNotInRefSet, referer.ToString()); diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h index f06cbb7096..b7467a55f5 100644 --- a/ydb/core/ymq/actor/service.h +++ b/ydb/core/ymq/actor/service.h @@ -38,6 +38,8 @@ private: using TQueueInfoPtr = TIntrusivePtr<TQueueInfo>; + class TLocalLeaderManager; + STATEFN(StateFunc); void InitSchemeCache(); @@ -58,6 +60,7 @@ private: void HandleUserSettingsChanged(TSqsEvents::TEvUserSettingsChanged::TPtr& ev); void HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev); void HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev); + void HandleLeaderStarted(TSqsEvents::TEvLeaderStarted::TPtr& ev); void HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev); void CreateNodeTrackingSubscription(TQueueInfoPtr queueInfo); void CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo); @@ -176,6 +179,7 @@ private: TDuration RescanInterval = TDuration::Minutes(1); }; TYcSearchEventsConfig YcSearchEventsConfig; + THolder<TLocalLeaderManager> LocalLeaderManager; }; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/base/counters.cpp b/ydb/core/ymq/base/counters.cpp index 87061d8d73..b3d4a60b59 100644 --- a/ydb/core/ymq/base/counters.cpp +++ b/ydb/core/ymq/base/counters.cpp @@ -1017,6 +1017,12 @@ void TMonitoringCounters::InitCounters() { INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesDone, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesRows, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesErrors, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); + + + INIT_COUNTER(MonitoringCounters, LocalLeaderStartInflight, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); + INIT_COUNTER(MonitoringCounters, LocalLeaderStartQueue, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); + + INIT_HISTOGRAM_COUNTER(MonitoringCounters, LocalLeaderStartAwaitMs, ELifetime::Expiring, DurationBucketsMs, ELaziness::OnDemand); } } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/base/counters.h b/ydb/core/ymq/base/counters.h index de94cb13b4..4c8ea01854 100644 --- a/ydb/core/ymq/base/counters.h +++ b/ydb/core/ymq/base/counters.h @@ -865,6 +865,10 @@ struct TMonitoringCounters : public TAtomicRefCount<TMonitoringCounters> { TLazyCachedCounter CleanupRemovedQueuesRows; TLazyCachedCounter CleanupRemovedQueuesErrors; + TLazyCachedCounter LocalLeaderStartInflight; + TLazyCachedCounter LocalLeaderStartQueue; + TLazyCachedHistogram LocalLeaderStartAwaitMs; + TMonitoringCounters(const NKikimrConfig::TSqsConfig& config, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& monitoringCounters) : MonitoringCounters(monitoringCounters) , Config(config) diff --git a/ydb/tests/functional/sqs/large/__init__.py b/ydb/tests/functional/sqs/large/__init__.py new file mode 100644 index 0000000000..faa18be5bb --- /dev/null +++ b/ydb/tests/functional/sqs/large/__init__.py @@ -0,0 +1,2 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- diff --git a/ydb/tests/functional/sqs/large/test_leader_start_inflight.py b/ydb/tests/functional/sqs/large/test_leader_start_inflight.py new file mode 100644 index 0000000000..9e74ae3e31 --- /dev/null +++ b/ydb/tests/functional/sqs/large/test_leader_start_inflight.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import logging +import time + +import pytest + +from ydb.tests.library.common.types import Erasure +from ydb.tests.library.sqs.test_base import KikimrSqsTestBase, IS_FIFO_PARAMS, TABLES_FORMAT_PARAMS + + +class TestSqsMultinodeCluster(KikimrSqsTestBase): + erasure = Erasure.BLOCK_4_2 + use_in_memory_pdisks = False + + @classmethod + def _setup_config_generator(cls): + config_generator = super(TestSqsMultinodeCluster, cls)._setup_config_generator() + config_generator.yaml_config['sqs_config']['masters_describer_update_time_ms'] = 1000 + config_generator.yaml_config['sqs_config']['background_metrics_update_time_ms'] = 1000 + config_generator.yaml_config['sqs_config']['start_local_leader_inflight_max'] = 1 + config_generator.yaml_config['sqs_config']['account_settings_defaults'] = {'max_queues_count': 50000} + return config_generator + + def get_leaders_per_nodes(self): + nodes = len(self.cluster.nodes) + leaders = [] + for node_index in range(nodes): + counters = self._get_counters(node_index, 'utils', counters_format='json', dump_to_log=False) + labels = { + 'activity': 'SQS_QUEUE_LEADER_ACTOR', + 'sensor': 'ActorsAliveByActivity' + } + leader_actors = self._get_counter(counters, labels) + leaders.append(leader_actors['value'] if leader_actors else 0) + return leaders + + @pytest.mark.parametrize(**IS_FIFO_PARAMS) + @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) + def test_limit_leader_start_inflight(self, is_fifo, tables_format): + self._init_with_params(is_fifo, tables_format) + + queues = [] + for i in range(100): + queues.append(self._create_queue_and_assert(f'{i}_{self.queue_name}', is_fifo=is_fifo)) + + def send_messages(): + for q in queues: + self.seq_no += 1 + seq_no = self.seq_no if is_fifo else None + group_id = 'group' if is_fifo else None + self._send_message_and_assert(q, f'test_send_message for {q}', seq_no=seq_no, group_id=group_id) + + send_messages() + + while True: + for node_index in range(len(self.cluster.nodes))[1:]: + self._kick_tablets_from_node(node_index) + leaders = self.get_leaders_per_nodes() + logging.info(f'started leaders {leaders}, expected {len(queues)} only on node=0.') + if sum(leaders) == len(queues) and leaders[0] == len(queues): + break + time.sleep(5) + + self._kick_tablets_from_node(0) + self._enable_tablets_on_node(1) + + while True: + leaders = self.get_leaders_per_nodes() + logging.info(f'started leaders {leaders}, expected {len(queues)} only on node=1.') + if sum(leaders) == len(queues) and leaders[1] == len(queues): + break + time.sleep(5) + send_messages() |