aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2023-06-02 16:37:41 +0300
committeralexbogo <alexbogo@ydb.tech>2023-06-02 16:37:41 +0300
commita222577a9fdf332653f853e10b061c51a7f3a76c (patch)
tree9b80fdcca8e6650eeda9c0057e3a0212254159d9
parenta080f3314e29c6dcae3c36f3198b7aa36edb6cfd (diff)
downloadydb-a222577a9fdf332653f853e10b061c51a7f3a76c.tar.gz
[ymq] limit inflight of starting local leaders
init init
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/ymq/actor/events.h4
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp9
-rw-r--r--ydb/core/ymq/actor/service.cpp201
-rw-r--r--ydb/core/ymq/actor/service.h4
-rw-r--r--ydb/core/ymq/base/counters.cpp6
-rw-r--r--ydb/core/ymq/base/counters.h4
-rw-r--r--ydb/tests/functional/sqs/large/__init__.py2
-rw-r--r--ydb/tests/functional/sqs/large/test_leader_start_inflight.py74
9 files changed, 257 insertions, 48 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index f23b4750c8..65118fc7ff 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1020,6 +1020,7 @@ message TSqsConfig {
optional TYcSearchEventsConfig YcSearchEventsConfig = 66;
optional TYdbAuthConfig AuthConfig = 67;
+ optional uint64 StartLocalLeaderInflightMax = 71 [default = 500];
}
message TConfigsDispatcherConfig {
diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h
index f500e44391..6c856bd37e 100644
--- a/ydb/core/ymq/actor/events.h
+++ b/ydb/core/ymq/actor/events.h
@@ -134,6 +134,7 @@ struct TSqsEvents {
EvForceReloadState,
EvReloadStateRequest,
EvReloadStateResponse,
+ EvLeaderStarted,
EvEnd,
};
@@ -966,6 +967,9 @@ struct TSqsEvents {
Record.SetReloadedAtMs(reloadedAt.MilliSeconds());
}
};
+
+ struct TEvLeaderStarted : public NActors::TEventLocal<TEvLeaderStarted, EvLeaderStarted> {
+ };
};
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index 1de9b4290a..d04a0960a9 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -117,6 +117,8 @@ void TQueueLeader::BecomeWorking() {
for (auto&& [reqIdAndShard, reqInfo] : ChangeMessageVisibilityRequests_) {
ProcessChangeMessageVisibilityBatch(reqInfo);
}
+
+ Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvLeaderStarted());
}
STATEFN(TQueueLeader::StateInit) {
@@ -176,6 +178,10 @@ STATEFN(TQueueLeader::StateWorking) {
void TQueueLeader::PassAway() {
LOG_SQS_INFO("Queue " << TLogQueueName(UserName_, QueueName_) << " leader is dying");
+ if (CurrentStateFunc() != &TThis::StateWorking) {
+ Send(MakeSqsServiceID(SelfId().NodeId()), new TSqsEvents::TEvLeaderStarted());
+ }
+
for (auto& req : GetConfigurationRequests_) {
AnswerFailed(req);
}
@@ -278,7 +284,6 @@ void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) {
LOG_SQS_DEBUG("Handle state for " << TLogQueueName(UserName_, QueueName_));
Y_VERIFY(UpdateStateRequestStartedAt != TInstant::Zero());
- bool success = reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete;
if (reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
using NKikimr::NClient::TValue;
const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse()));
@@ -314,7 +319,7 @@ void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) {
new TSqsEvents::TEvReloadStateResponse(
UserName_,
QueueName_,
- success ? UpdateStateRequestStartedAt : TInstant::Zero()
+ UpdateStateRequestStartedAt
)
);
}
diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp
index d6d6a790a2..3b8dc95a3f 100644
--- a/ydb/core/ymq/actor/service.cpp
+++ b/ydb/core/ymq/actor/service.cpp
@@ -101,17 +101,16 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
bool LeaderMustBeOnCurrentNode() const {
return LeaderNodeId_ && LeaderNodeId_.value() == SelfId().NodeId();
}
+ bool NeedStartLocalLeader() const {
+ return !LocalLeader_ && (LocalLeaderRefCount_ > 0 || LeaderMustBeOnCurrentNode());
+ }
+
+ bool NeedStopLocalLeader() const {
+ return LocalLeader_ && LocalLeaderRefCount_ == 0 && !LeaderMustBeOnCurrentNode();
+ }
void SetLeaderNodeId(ui32 nodeId) {
- if (LeaderNodeId_ && LeaderNodeId_ == nodeId) {
- return;
- }
LeaderNodeId_ = nodeId;
- if (LeaderMustBeOnCurrentNode()) {
- StartLocalLeader(LEADER_CREATE_REASON_LOCAL_TABLET);
- } else {
- StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE);
- }
}
void LocalLeaderWayMoved() const {
@@ -121,54 +120,50 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
}
void StartLocalLeader(const TString& reason) {
- if (!LocalLeader_) {
- Counters_ = Counters_->GetCountersForLeaderNode();
- LWPROBE(CreateLeader, UserName_, QueueName_, reason);
- LocalLeader_ = TActivationContext::Register(new TQueueLeader(
- UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_,
- SchemeCache_, QuoterResourcesForUser_, UseLeaderCPUOptimization
- ));
- LOG_SQS_INFO("Start local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_);
-
- if (FolderId_) {
- Y_VERIFY(FolderCounters_);
- FolderCounters_->InitCounters();
- INC_COUNTER(FolderCounters_, total_count);
- }
+ Y_VERIFY(!LocalLeader_);
+ Counters_ = Counters_->GetCountersForLeaderNode();
+ LWPROBE(CreateLeader, UserName_, QueueName_, reason);
+ LocalLeader_ = TActivationContext::Register(new TQueueLeader(
+ UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_,
+ SchemeCache_, QuoterResourcesForUser_, UseLeaderCPUOptimization
+ ));
+ LOG_SQS_INFO("Start local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_);
+
+ if (FolderId_) {
+ Y_VERIFY(FolderCounters_);
+ FolderCounters_->InitCounters();
+ INC_COUNTER(FolderCounters_, total_count);
}
- }
- void StopLocalLeaderIfNeeded(const TString& reason) {
- if (!LeaderMustBeOnCurrentNode() && LocalLeaderRefCount_ == 0) {
- StopLocalLeader(reason);
+ for (auto ev : GetConfigurationRequests_) {
+ TActivationContext::Send(ev->Forward(LocalLeader_));
}
+ GetConfigurationRequests_.clear();
}
void StopLocalLeader(const TString& reason) {
- if (LocalLeader_) {
- Counters_ = Counters_->GetCountersForNotLeaderNode();
- LWPROBE(DestroyLeader, UserName_, QueueName_, reason);
- LOG_SQS_INFO("Stop local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_);
- TActivationContext::Send(new IEventHandle(LocalLeader_, SelfId(), new TEvPoisonPill()));
- LocalLeader_ = TActorId();
- if (FolderId_) {
- Y_VERIFY(FolderCounters_);
- DEC_COUNTER(FolderCounters_, total_count);
- }
+ Y_VERIFY(LocalLeader_);
+ Counters_ = Counters_->GetCountersForNotLeaderNode();
+ LWPROBE(DestroyLeader, UserName_, QueueName_, reason);
+ LOG_SQS_INFO("Stop local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_);
+ TActivationContext::Send(new IEventHandle(LocalLeader_, SelfId(), new TEvPoisonPill()));
+ LocalLeader_ = TActorId();
+ if (FolderId_) {
+ Y_VERIFY(FolderCounters_);
+ DEC_COUNTER(FolderCounters_, total_count);
}
}
- void IncLocalLeaderRef(const TString& reason) {
- StartLocalLeader(reason);
+ void IncLocalLeaderRef() {
++LocalLeaderRefCount_;
}
- void DecLocalLeaderRef(const TString& reason) {
+ void DecLocalLeaderRef() {
Y_VERIFY(LocalLeaderRefCount_ > 0);
--LocalLeaderRefCount_;
- StopLocalLeaderIfNeeded(reason);
}
+
TActorIdentity SelfId() const {
return TActorIdentity(TActivationContext::AsActorContext().SelfID);
}
@@ -197,9 +192,112 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
// State machine
THashSet<TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_;
+ TVector<TSqsEvents::TEvGetConfiguration::TPtr> GetConfigurationRequests_;
TInstant NodeUnknownSince_ = TInstant::Now();
+
};
+class TSqsService::TLocalLeaderManager {
+public:
+ TLocalLeaderManager(TIntrusivePtr<TMonitoringCounters> counters)
+ : MaxInflight(Cfg().GetStartLocalLeaderInflightMax())
+ , Counters(counters)
+ {
+ }
+ void QueueRemoved(TQueueInfoPtr queue);
+ void SetLeaderNodeId(TQueueInfoPtr queue, ui32 nodeId, TInstant now);
+ void IncLocalLeaderRef(TQueueInfoPtr queue, const TString& reason, TInstant now);
+ void DecLocalLeaderRef(TQueueInfoPtr queue, const TString& reason);
+ void LocalLeaderStarted(TInstant now);
+private:
+ struct TAwaitingQueueInfo {
+ TAwaitingQueueInfo(TQueueInfoPtr queue, TInstant since, const TString& reason)
+ : Queue(queue)
+ , Since(since)
+ , Reason(reason)
+ {}
+ TQueueInfoPtr Queue;
+ TInstant Since;
+ const TString& Reason;
+ };
+private:
+ void TryStartLocalLeader(TQueueInfoPtr queue, const TString& reason, TInstant waitSince, TInstant now);
+ void ProcessAwaiting(TInstant now);
+
+private:
+ const ui64 MaxInflight;
+ ui64 Inflight = 0;
+ TDeque<TAwaitingQueueInfo> Awaiting;
+ THashSet<TQueueInfoPtr> AlreadyAwaiting;
+ TIntrusivePtr<TMonitoringCounters> Counters;
+};
+
+
+void TSqsService::TLocalLeaderManager::QueueRemoved(TQueueInfoPtr queue) {
+ queue->LeaderNodeId_.reset();
+ if (queue->NeedStopLocalLeader()) {
+ queue->StopLocalLeader(LEADER_DESTROY_REASON_REMOVE_INFO);
+ }
+}
+
+void TSqsService::TLocalLeaderManager::SetLeaderNodeId(TQueueInfoPtr queue, ui32 nodeId, TInstant now) {
+ queue->SetLeaderNodeId(nodeId);
+ if (queue->NeedStartLocalLeader()) {
+ TryStartLocalLeader(queue, LEADER_CREATE_REASON_LOCAL_TABLET, now, now);
+ } else if (queue->NeedStopLocalLeader()) {
+ queue->StopLocalLeader(LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE);
+ }
+}
+
+void TSqsService::TLocalLeaderManager::IncLocalLeaderRef(TQueueInfoPtr queue, const TString& reason, TInstant now) {
+ queue->IncLocalLeaderRef();
+ TryStartLocalLeader(queue, reason, now, now);
+}
+
+void TSqsService::TLocalLeaderManager::TryStartLocalLeader(TQueueInfoPtr queue, const TString& reason, TInstant waitSince, TInstant now) {
+ if (queue->NeedStartLocalLeader()) {
+ if (MaxInflight != 0 && Inflight >= MaxInflight) {
+ if (!AlreadyAwaiting.count(queue)) {
+ LOG_SQS_DEBUG("Queue [" << queue->UserName_ << "/" << queue->QueueName_ << "] is waiting for the leader to start, inflight=" << Inflight);
+ Awaiting.emplace_back(queue, waitSince, reason);
+ AlreadyAwaiting.insert(queue);
+ }
+ } else {
+ ++Inflight;
+ queue->StartLocalLeader(reason);
+ Counters->LocalLeaderStartAwaitMs->Collect((now - waitSince).MilliSeconds());
+ }
+ }
+ *Counters->LocalLeaderStartInflight = Inflight;
+ *Counters->LocalLeaderStartQueue = Awaiting.size();
+}
+
+void TSqsService::TLocalLeaderManager::DecLocalLeaderRef(TQueueInfoPtr queue, const TString& reason) {
+ queue->DecLocalLeaderRef();
+ if (queue->NeedStopLocalLeader()) {
+ queue->StopLocalLeader(reason);
+ }
+}
+
+void TSqsService::TLocalLeaderManager::LocalLeaderStarted(TInstant now) {
+ Y_VERIFY(Inflight > 0);
+ --Inflight;
+
+ ProcessAwaiting(now);
+
+ *Counters->LocalLeaderStartInflight = Inflight;
+ *Counters->LocalLeaderStartQueue = Awaiting.size();
+}
+
+void TSqsService::TLocalLeaderManager::ProcessAwaiting(TInstant now) {
+ while (!Awaiting.empty() && (MaxInflight == 0 || Inflight < MaxInflight)) {
+ auto info = Awaiting.front();
+ Awaiting.pop_front();
+ AlreadyAwaiting.erase(info.Queue);
+ TryStartLocalLeader(info.Queue, info.Reason, info.Since, now);
+ }
+}
+
struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> {
TUserInfo(TString userName, TIntrusivePtr<TUserCounters> userCounters)
: UserName_(std::move(userName))
@@ -302,6 +400,8 @@ void TSqsService::Bootstrap() {
InitSchemeCache();
NodeTrackerActor_ = Register(new TNodeTrackerActor(SchemeCache_));
+ LocalLeaderManager = MakeHolder<TLocalLeaderManager>(MonitoringCounters_);
+
Register(new TCleanupQueueDataActor(MonitoringCounters_));
Register(new TMonitoringActor(MonitoringCounters_));
@@ -351,6 +451,8 @@ STATEFN(TSqsService::StateFunc) {
hFunc(TSqsEvents::TEvSqsRequest, HandleSqsRequest);
hFunc(TSqsEvents::TEvInsertQueueCounters, HandleInsertQueueCounters);
hFunc(TSqsEvents::TEvUserSettingsChanged, HandleUserSettingsChanged);
+ hFunc(TSqsEvents::TEvLeaderStarted, HandleLeaderStarted);
+
hFunc(TSqsEvents::TEvQueuesList, HandleQueuesList);
default:
LOG_SQS_ERROR("Unknown type of event came to SQS service actor: " << ev->Type << " (" << ev->GetTypeName() << "), sender: " << ev->Sender);
@@ -595,7 +697,11 @@ void TSqsService::ProcessConfigurationRequestForQueue(TSqsEvents::TEvGetConfigur
if (ev->Get()->Flags & TSqsEvents::TEvGetConfiguration::EFlags::NeedQueueLeader) {
IncLocalLeaderRef(ev->Sender, queueInfo, LEADER_CREATE_REASON_USER_REQUEST);
RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Forward configuration request to queue [" << queueInfo->UserName_ << "/" << queueInfo->QueueName_ << "] leader");
- TActivationContext::Send(ev->Forward(queueInfo->LocalLeader_));
+ if (queueInfo->LocalLeader_) {
+ TActivationContext::Send(ev->Forward(queueInfo->LocalLeader_));
+ } else {
+ queueInfo->GetConfigurationRequests_.emplace_back(std::move(ev));
+ }
} else {
RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Answer configuration for queue [" << queueInfo->UserName_ << "/" << queueInfo->QueueName_ << "] without leader");
AnswerLeaderlessConfiguration(ev, userInfo, queueInfo);
@@ -774,7 +880,7 @@ void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTracke
auto& queue = *queuePtr;
auto nodeId = ev->Get()->NodeId;
bool disconnected = ev->Get()->Disconnected;
- queue.SetLeaderNodeId(nodeId);
+ LocalLeaderManager->SetLeaderNodeId(queuePtr, nodeId, TActivationContext::Now());
if (disconnected) {
queue.LocalLeaderWayMoved();
}
@@ -790,6 +896,10 @@ void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTracke
QueuesWithGetNodeWaitingRequests.erase(queuePtr);
}
+void TSqsService::HandleLeaderStarted(TSqsEvents::TEvLeaderStarted::TPtr&) {
+ LocalLeaderManager->LocalLeaderStarted(TActivationContext::Now());
+}
+
void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) {
RequestingQueuesList_ = false;
LastRequestQueuesListTime_ = TActivationContext::Now();
@@ -1168,8 +1278,7 @@ void TSqsService::CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo) {
queueInfo->NodeTrackingSubscriptionId = 0;
QueuePerNodeTrackingSubscription.erase(id);
- queueInfo->LeaderNodeId_.reset();
- queueInfo->StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_REMOVE_INFO);
+ LocalLeaderManager->QueueRemoved(queueInfo);
Send(
NodeTrackerActor_,
@@ -1299,7 +1408,7 @@ void TSqsService::IncLocalLeaderRef(const TActorId& referer, const TQueueInfoPtr
const auto [iter, inserted] = LocalLeaderRefs_.emplace(referer, queueInfo);
if (inserted) {
LOG_SQS_TRACE("Inc local leader ref for actor " << referer);
- queueInfo->IncLocalLeaderRef(reason);
+ LocalLeaderManager->IncLocalLeaderRef(queueInfo, reason, TActivationContext::Now());
} else {
LWPROBE(IncLeaderRefAlreadyHasRef, queueInfo->UserName_, queueInfo->QueueName_, referer.ToString());
LOG_SQS_WARN("Inc local leader ref for actor " << referer << ". Ignore because this actor already presents in referers set");
@@ -1312,7 +1421,7 @@ void TSqsService::DecLocalLeaderRef(const TActorId& referer, const TString& reas
LOG_SQS_TRACE("Dec local leader ref for actor " << referer << ". Found: " << (iter != LocalLeaderRefs_.end()));
if (iter != LocalLeaderRefs_.end()) {
auto queueInfo = iter->second;
- queueInfo->DecLocalLeaderRef(reason);
+ LocalLeaderManager->DecLocalLeaderRef(queueInfo, reason);
LocalLeaderRefs_.erase(iter);
} else {
LWPROBE(DecLeaderRefNotInRefSet, referer.ToString());
diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h
index f06cbb7096..b7467a55f5 100644
--- a/ydb/core/ymq/actor/service.h
+++ b/ydb/core/ymq/actor/service.h
@@ -38,6 +38,8 @@ private:
using TQueueInfoPtr = TIntrusivePtr<TQueueInfo>;
+ class TLocalLeaderManager;
+
STATEFN(StateFunc);
void InitSchemeCache();
@@ -58,6 +60,7 @@ private:
void HandleUserSettingsChanged(TSqsEvents::TEvUserSettingsChanged::TPtr& ev);
void HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev);
void HandleReloadStateRequest(TSqsEvents::TEvReloadStateRequest::TPtr& ev);
+ void HandleLeaderStarted(TSqsEvents::TEvLeaderStarted::TPtr& ev);
void HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev);
void CreateNodeTrackingSubscription(TQueueInfoPtr queueInfo);
void CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo);
@@ -176,6 +179,7 @@ private:
TDuration RescanInterval = TDuration::Minutes(1);
};
TYcSearchEventsConfig YcSearchEventsConfig;
+ THolder<TLocalLeaderManager> LocalLeaderManager;
};
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/base/counters.cpp b/ydb/core/ymq/base/counters.cpp
index 87061d8d73..b3d4a60b59 100644
--- a/ydb/core/ymq/base/counters.cpp
+++ b/ydb/core/ymq/base/counters.cpp
@@ -1017,6 +1017,12 @@ void TMonitoringCounters::InitCounters() {
INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesDone, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesRows, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesErrors, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
+
+
+ INIT_COUNTER(MonitoringCounters, LocalLeaderStartInflight, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
+ INIT_COUNTER(MonitoringCounters, LocalLeaderStartQueue, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
+
+ INIT_HISTOGRAM_COUNTER(MonitoringCounters, LocalLeaderStartAwaitMs, ELifetime::Expiring, DurationBucketsMs, ELaziness::OnDemand);
}
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/base/counters.h b/ydb/core/ymq/base/counters.h
index de94cb13b4..4c8ea01854 100644
--- a/ydb/core/ymq/base/counters.h
+++ b/ydb/core/ymq/base/counters.h
@@ -865,6 +865,10 @@ struct TMonitoringCounters : public TAtomicRefCount<TMonitoringCounters> {
TLazyCachedCounter CleanupRemovedQueuesRows;
TLazyCachedCounter CleanupRemovedQueuesErrors;
+ TLazyCachedCounter LocalLeaderStartInflight;
+ TLazyCachedCounter LocalLeaderStartQueue;
+ TLazyCachedHistogram LocalLeaderStartAwaitMs;
+
TMonitoringCounters(const NKikimrConfig::TSqsConfig& config, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& monitoringCounters)
: MonitoringCounters(monitoringCounters)
, Config(config)
diff --git a/ydb/tests/functional/sqs/large/__init__.py b/ydb/tests/functional/sqs/large/__init__.py
new file mode 100644
index 0000000000..faa18be5bb
--- /dev/null
+++ b/ydb/tests/functional/sqs/large/__init__.py
@@ -0,0 +1,2 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
diff --git a/ydb/tests/functional/sqs/large/test_leader_start_inflight.py b/ydb/tests/functional/sqs/large/test_leader_start_inflight.py
new file mode 100644
index 0000000000..9e74ae3e31
--- /dev/null
+++ b/ydb/tests/functional/sqs/large/test_leader_start_inflight.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import time
+
+import pytest
+
+from ydb.tests.library.common.types import Erasure
+from ydb.tests.library.sqs.test_base import KikimrSqsTestBase, IS_FIFO_PARAMS, TABLES_FORMAT_PARAMS
+
+
+class TestSqsMultinodeCluster(KikimrSqsTestBase):
+ erasure = Erasure.BLOCK_4_2
+ use_in_memory_pdisks = False
+
+ @classmethod
+ def _setup_config_generator(cls):
+ config_generator = super(TestSqsMultinodeCluster, cls)._setup_config_generator()
+ config_generator.yaml_config['sqs_config']['masters_describer_update_time_ms'] = 1000
+ config_generator.yaml_config['sqs_config']['background_metrics_update_time_ms'] = 1000
+ config_generator.yaml_config['sqs_config']['start_local_leader_inflight_max'] = 1
+ config_generator.yaml_config['sqs_config']['account_settings_defaults'] = {'max_queues_count': 50000}
+ return config_generator
+
+ def get_leaders_per_nodes(self):
+ nodes = len(self.cluster.nodes)
+ leaders = []
+ for node_index in range(nodes):
+ counters = self._get_counters(node_index, 'utils', counters_format='json', dump_to_log=False)
+ labels = {
+ 'activity': 'SQS_QUEUE_LEADER_ACTOR',
+ 'sensor': 'ActorsAliveByActivity'
+ }
+ leader_actors = self._get_counter(counters, labels)
+ leaders.append(leader_actors['value'] if leader_actors else 0)
+ return leaders
+
+ @pytest.mark.parametrize(**IS_FIFO_PARAMS)
+ @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
+ def test_limit_leader_start_inflight(self, is_fifo, tables_format):
+ self._init_with_params(is_fifo, tables_format)
+
+ queues = []
+ for i in range(100):
+ queues.append(self._create_queue_and_assert(f'{i}_{self.queue_name}', is_fifo=is_fifo))
+
+ def send_messages():
+ for q in queues:
+ self.seq_no += 1
+ seq_no = self.seq_no if is_fifo else None
+ group_id = 'group' if is_fifo else None
+ self._send_message_and_assert(q, f'test_send_message for {q}', seq_no=seq_no, group_id=group_id)
+
+ send_messages()
+
+ while True:
+ for node_index in range(len(self.cluster.nodes))[1:]:
+ self._kick_tablets_from_node(node_index)
+ leaders = self.get_leaders_per_nodes()
+ logging.info(f'started leaders {leaders}, expected {len(queues)} only on node=0.')
+ if sum(leaders) == len(queues) and leaders[0] == len(queues):
+ break
+ time.sleep(5)
+
+ self._kick_tablets_from_node(0)
+ self._enable_tablets_on_node(1)
+
+ while True:
+ leaders = self.get_leaders_per_nodes()
+ logging.info(f'started leaders {leaders}, expected {len(queues)} only on node=1.')
+ if sum(leaders) == len(queues) and leaders[1] == len(queues):
+ break
+ time.sleep(5)
+ send_messages()