aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2022-07-14 16:15:10 +0300
committeralexbogo <alexbogo@ydb.tech>2022-07-14 16:15:10 +0300
commite49b95a764c9bc6ecae8c27be33e68a226036049 (patch)
treec087f5b6676df5f0591d4d634c84577d7b7d16aa
parentf9a769cdfc6999a2fe5d0da2e20c40af02bbb5f4 (diff)
downloadydb-e49b95a764c9bc6ecae8c27be33e68a226036049.tar.gz
[sqs] balancing leaders
init
-rw-r--r--ydb/core/ymq/actor/CMakeLists.txt1
-rw-r--r--ydb/core/ymq/actor/events.h43
-rw-r--r--ydb/core/ymq/actor/node_tracker.cpp394
-rw-r--r--ydb/core/ymq/actor/node_tracker.h121
-rw-r--r--ydb/core/ymq/actor/queues_list_reader.cpp1
-rw-r--r--ydb/core/ymq/actor/service.cpp250
-rw-r--r--ydb/core/ymq/actor/service.h12
-rw-r--r--ydb/core/ymq/queues/std/queries.cpp1
-rw-r--r--ydb/tests/functional/sqs/common/test_queues_managing.py2
-rw-r--r--ydb/tests/functional/sqs/merge_split_common_table/__init__.py0
-rw-r--r--ydb/tests/functional/sqs/merge_split_common_table/fifo/test.py6
-rw-r--r--ydb/tests/functional/sqs/merge_split_common_table/std/test.py6
-rw-r--r--ydb/tests/functional/sqs/merge_split_common_table/test.py132
-rw-r--r--ydb/tests/library/sqs/tables.py35
-rw-r--r--ydb/tests/library/sqs/test_base.py2
15 files changed, 889 insertions, 117 deletions
diff --git a/ydb/core/ymq/actor/CMakeLists.txt b/ydb/core/ymq/actor/CMakeLists.txt
index 329ab01f408..623ced76fe8 100644
--- a/ydb/core/ymq/actor/CMakeLists.txt
+++ b/ydb/core/ymq/actor/CMakeLists.txt
@@ -84,6 +84,7 @@ target_sources(core-ymq-actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/metering.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/migration.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/modify_permissions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/node_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/proxy_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/purge.cpp
${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/purge_queue.cpp
diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h
index 71bfaf13e7e..b0e631b6e21 100644
--- a/ydb/core/ymq/actor/events.h
+++ b/ydb/core/ymq/actor/events.h
@@ -126,6 +126,10 @@ struct TSqsEvents {
EvCleanupQueryComplete,
+ EvNodeTrackerSubscribeRequest,
+ EvNodeTrackerUnsubscribeRequest,
+ EvNodeTrackerSubscriptionStatus,
+
EvEnd,
};
@@ -801,6 +805,7 @@ struct TSqsEvents {
ui64 Version = 0;
ui64 ShardsCount = 0;
TInstant CreatedTimestamp;
+ bool IsFifo = false;
bool operator<(const TQueueRecord& r) const {
return std::tie(UserName, QueueName) < std::tie(r.UserName, r.QueueName);
@@ -890,6 +895,44 @@ struct TSqsEvents {
TString Name;
ui64 Type;
};
+
+ struct TEvNodeTrackerSubscribeRequest
+ : public NActors::TEventLocal<TEvNodeTrackerSubscribeRequest, EvNodeTrackerSubscribeRequest>
+ {
+ explicit TEvNodeTrackerSubscribeRequest(
+ ui64 subscriptionId,
+ ui64 queueIdNumber,
+ bool isFifo,
+ std::optional<ui64> tabletId = {}
+ )
+ : SubscriptionId(subscriptionId)
+ , QueueIdNumber(queueIdNumber)
+ , IsFifo(isFifo)
+ , TabletId(tabletId)
+ {}
+ ui64 SubscriptionId;
+ ui64 QueueIdNumber;
+ bool IsFifo;
+ std::optional<ui64> TabletId;
+ };
+
+ struct TEvNodeTrackerUnsubscribeRequest
+ : public NActors::TEventLocal<TEvNodeTrackerUnsubscribeRequest, EvNodeTrackerUnsubscribeRequest>
+ {
+ TEvNodeTrackerUnsubscribeRequest(ui64 subscriptionId)
+ : SubscriptionId(subscriptionId)
+ {}
+ ui64 SubscriptionId;
+ };
+
+ struct TEvNodeTrackerSubscriptionStatus : public NActors::TEventLocal<TEvNodeTrackerSubscriptionStatus, EvNodeTrackerSubscriptionStatus> {
+ explicit TEvNodeTrackerSubscriptionStatus(ui64 subscriptionId, ui32 nodeId)
+ : SubscriptionId(subscriptionId)
+ , NodeId(nodeId)
+ {}
+ ui64 SubscriptionId;
+ ui32 NodeId;
+ };
};
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/node_tracker.cpp b/ydb/core/ymq/actor/node_tracker.cpp
new file mode 100644
index 00000000000..eab06da69ae
--- /dev/null
+++ b/ydb/core/ymq/actor/node_tracker.cpp
@@ -0,0 +1,394 @@
+#include "node_tracker.h"
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/ymq/actor/cfg.h>
+#include <ydb/core/ymq/actor/serviceid.h>
+#include <ydb/core/ymq/queues/common/key_hashes.h>
+
+#include <util/string/vector.h>
+
+namespace {
+ ui64 GetKeyCellValue(const NKikimr::TCell& cell) {
+ return cell.IsNull() ? Max<ui64>() : cell.AsValue<ui64>();
+ }
+
+ std::tuple<ui64, ui64> GetKeysPrefix(const TConstArrayRef<NKikimr::TCell>& cells) {
+ if (cells.empty()) {
+ return {Max<ui64>(), Max<ui64>()};
+ }
+ return {GetKeyCellValue(cells[0]), GetKeyCellValue(cells[1])};
+ }
+
+ constexpr ui64 STD_PATH_SUBSCRIPTION_KEY = 1;
+ constexpr ui64 FIFO_PATH_SUBSCRIPTION_KEY = 2;
+}
+
+namespace NKikimr::NSQS {
+ TNodeTrackerActor::TSubscriberInfo::TSubscriberInfo(
+ ui64 queueIdNumber,
+ bool isFifo,
+ std::optional<ui64> specifiedLeaderTabletId,
+ std::optional<ui32> nodeId
+ )
+ : QueueIdNumber(queueIdNumber)
+ , IsFifo(isFifo)
+ , SpecifiedLeaderTabletId(specifiedLeaderTabletId)
+ , NodeId(nodeId)
+ {}
+
+ const char* TNodeTrackerActor::GetLogPrefix() {
+ return "[Node tracker] ";
+ }
+
+ TNodeTrackerActor::TNodeTrackerActor(NActors::TActorId schemeCacheActor)
+ : SchemeCacheActor(schemeCacheActor)
+ , TablePathSTD(NKikimr::SplitPath(Cfg().GetRoot() + "/.STD/Messages"))
+ , TablePathFIFO(NKikimr::SplitPath(Cfg().GetRoot() + "/.FIFO/Messages"))
+ {
+ }
+
+ void TNodeTrackerActor::Bootstrap(const NActors::TActorContext& ctx) {
+ ParentActor = MakeSqsServiceID(SelfId().NodeId());
+ Become(&TNodeTrackerActor::WorkFunc);
+ ScheduleDescribeTables(TDuration::Zero(), ctx);
+ Schedule(CLEANUP_UNUSED_TABLETS_PERIOD, new TEvents::TEvWakeup());
+ LOG_SQS_DEBUG(GetLogPrefix() << "bootstrap on node=" << SelfId().NodeId());
+ }
+
+ void TNodeTrackerActor::HandleWakeup(TEvWakeup::TPtr&, const NActors::TActorContext& ctx) {
+ // removed unused tablets
+ for (auto it = LastAccessOfTabletWithoutSubscribers.begin(); it != LastAccessOfTabletWithoutSubscribers.end();) {
+ ui64 tabletId = it->first;
+ TInstant lastAccess = it->second;
+ auto currentIt = it++;
+ if (ctx.Now() - lastAccess >= CLEANUP_UNUSED_TABLETS_PERIOD) {
+ auto infoIt = TabletsInfo.find(tabletId);
+ if (infoIt != TabletsInfo.end()) {
+ ClosePipeToTablet(infoIt->second);
+ TabletsInfo.erase(infoIt);
+ LastAccessOfTabletWithoutSubscribers.erase(currentIt);
+ } else {
+ LOG_SQS_ERROR(GetLogPrefix() << "unknown tabletId=" << tabletId << " with last access at " << lastAccess << " in unused tablets cleanup");
+ }
+ }
+ }
+
+ Schedule(CLEANUP_UNUSED_TABLETS_PERIOD, new TEvents::TEvWakeup());
+ }
+
+ void TNodeTrackerActor::ScheduleDescribeTables(TDuration runAfter, const NActors::TActorContext& ctx) {
+ LOG_SQS_NOTICE(GetLogPrefix() << "schedule describe tables after " << runAfter);
+ auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
+ //navigateRequest->DatabaseName = Cfg().GetRoot();
+
+ navigateRequest->ResultSet.resize(2);
+ navigateRequest->ResultSet.front().Path = TablePathSTD;
+ navigateRequest->ResultSet.back().Path = TablePathFIFO;
+ ctx.ExecutorThread.ActorSystem->Schedule(
+ runAfter,
+ new IEventHandle(
+ SchemeCacheActor,
+ SelfId(),
+ new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release())
+ )
+ );
+ }
+
+ void TNodeTrackerActor::DescribeTablesFailed(const TString& error, const NActors::TActorContext& ctx) {
+ LOG_SQS_ERROR(GetLogPrefix() << "describe tables failed: " << error);
+ DescribeTablesRetyPeriod = Min(
+ DESCRIBE_TABLES_PERIOD_MAX,
+ 2 * Max(DescribeTablesRetyPeriod, TDuration::MilliSeconds(100))
+ );
+ ScheduleDescribeTables(DescribeTablesRetyPeriod, ctx);
+ }
+
+ void TNodeTrackerActor::HandlePipeClientConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) {
+ ui64 tabletId = ev->Get()->TabletId;
+ auto it = TabletsInfo.find(tabletId);
+ if (it == TabletsInfo.end()) {
+ LOG_SQS_WARN(GetLogPrefix() << "connected to unrequired tablet. Tablet id: [" << tabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
+ return;
+ }
+
+ auto& info = it->second;
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ LOG_SQS_WARN(GetLogPrefix() << "failed to connect to tablet " << tabletId << " dead=" << ev->Get()->Dead);
+
+ if (ev->Get()->Dead) {
+ MoveSubscribersFromTablet(tabletId, info, ctx);
+ return;
+ }
+ ReconnectToTablet(tabletId);
+ return;
+ }
+
+ LOG_SQS_DEBUG(GetLogPrefix() << "connected to tabletId [" << tabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
+ info.PipeServer = ev->Get()->ServerId;
+ ui32 nodeId = info.PipeServer.NodeId();
+ for (auto& [id, subscriber] : info.Subscribers) {
+ if (!subscriber->NodeId || subscriber->NodeId.value() != nodeId) {
+ subscriber->NodeId = nodeId;
+ AnswerForSubscriber(id, nodeId);
+ }
+ }
+ }
+
+ void TNodeTrackerActor::AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId) {
+ Send(ParentActor, new TSqsEvents::TEvNodeTrackerSubscriptionStatus(subscriptionId, nodeId));
+ }
+
+ void TNodeTrackerActor::HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext&) {
+ auto it = TabletsInfo.find(ev->Get()->TabletId);
+ if (it != TabletsInfo.end()) {
+ LOG_SQS_DEBUG(GetLogPrefix() << "tablet pipe " << ev->Get()->TabletId << " disconnected");
+ ReconnectToTablet(ev->Get()->TabletId);
+ } else {
+ LOG_SQS_WARN(GetLogPrefix() << " disconnected from unrequired tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
+ }
+ }
+
+ void TNodeTrackerActor::ClosePipeToTablet(TTabletInfo& info) {
+ if (info.PipeClient) {
+ NTabletPipe::CloseClient(SelfId(), info.PipeClient);
+ info.PipeClient = info.PipeServer = TActorId();
+ }
+ }
+
+ TNodeTrackerActor::TTabletInfo& TNodeTrackerActor::ConnectToTablet(ui64 tabletId, bool isReconnect) {
+ LOG_SQS_DEBUG(GetLogPrefix() << "connect to tablet " << tabletId << " is_reconnect=" << isReconnect);
+ NTabletPipe::TClientConfig cfg;
+ cfg.AllowFollower = false;
+ cfg.CheckAliveness = true;
+ cfg.RetryPolicy = {.RetryLimitCount = 3, .MinRetryTime = TDuration::MilliSeconds(100), .DoFirstRetryInstantly = !isReconnect};
+
+ auto& info = TabletsInfo[tabletId];
+ ClosePipeToTablet(info);
+ info.PipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, cfg));
+ return info;
+ }
+
+ TNodeTrackerActor::TTabletInfo& TNodeTrackerActor::ReconnectToTablet(ui64 tabletId) {
+ return ConnectToTablet(tabletId, true);
+ }
+
+ TNodeTrackerActor::TTabletInfo& TNodeTrackerActor::GetTabletInfo(ui64 tabletId) {
+ auto it = TabletsInfo.find(tabletId);
+ if (it == TabletsInfo.end()) {
+ return ConnectToTablet(tabletId);
+ }
+ return it->second;
+ }
+
+ void TNodeTrackerActor::RemoveSubscriber(TSqsEvents::TEvNodeTrackerUnsubscribeRequest::TPtr& request, const NActors::TActorContext& ctx) {
+ ui64 subscriptionId = request->Get()->SubscriptionId;
+ LOG_SQS_DEBUG(GetLogPrefix() << "remove subscriber with id=" << subscriptionId);
+ auto it = TabletPerSubscriptionId.find(subscriptionId);
+ if (it != TabletPerSubscriptionId.end()) {
+ auto tabletIt = TabletsInfo.find(it->second);
+ if (tabletIt != TabletsInfo.end()) {
+ auto& info = tabletIt->second;
+ info.Subscribers.erase(subscriptionId);
+ if (info.Subscribers.empty()) {
+ LastAccessOfTabletWithoutSubscribers[tabletIt->first] = ctx.Now();
+ }
+ } else {
+ LOG_SQS_WARN("Node tracker removing subscription " << subscriptionId << "that is not found in the tablet information " << tabletIt->first);
+ }
+ TabletPerSubscriptionId.erase(it);
+ } else {
+ auto subscriptionIt = SubscriptionsAwaitingPartitionsUpdates.find(subscriptionId);
+ if (subscriptionIt != SubscriptionsAwaitingPartitionsUpdates.end()) {
+ SubscriptionsAwaitingPartitionsUpdates.erase(subscriptionIt);
+ } else {
+ LOG_SQS_WARN("Node tracker removing unknown subscription " << subscriptionId);
+ }
+ }
+ }
+
+ bool TNodeTrackerActor::SubscriberMustWait(const TSubscriberInfo& subscriber) const {
+ if (subscriber.SpecifiedLeaderTabletId) {
+ return false;
+ }
+ // partitions of the common table have not yet been received
+ if (subscriber.IsFifo) {
+ return TabletsPerEndKeyRangeFIFO.empty();
+ }
+ return TabletsPerEndKeyRangeSTD.empty();
+ }
+
+ void TNodeTrackerActor::AddSubscriber(TSqsEvents::TEvNodeTrackerSubscribeRequest::TPtr& request, const NActors::TActorContext& ctx) {
+ auto& req = *request->Get();
+ LOG_SQS_DEBUG(GetLogPrefix() << "add subscriber on init with id=" << req.SubscriptionId
+ << " queue_id_number=" << req.QueueIdNumber << " is_fifo=" << req.IsFifo
+ << " tablet_id=" << req.TabletId.value_or(0)
+ );
+ auto subscriber = std::make_unique<TSubscriberInfo>(req.QueueIdNumber, req.IsFifo, req.TabletId);
+
+ if (SubscriberMustWait(*subscriber)) {
+ SubscriptionsAwaitingPartitionsUpdates[req.SubscriptionId] = std::move(subscriber);
+ return;
+ }
+
+ AddSubscriber(req.SubscriptionId, std::move(subscriber), ctx);
+ }
+
+ void TNodeTrackerActor::AddSubscriber(ui64 subscriptionId, std::unique_ptr<TSubscriberInfo> subscriber, const NActors::TActorContext&) {
+ ui64 tabletId = GetTabletId(*subscriber);
+ TabletPerSubscriptionId[subscriptionId] = tabletId;
+ TTabletInfo& info = GetTabletInfo(tabletId);
+
+ if (info.PipeServer) {
+ ui32 nodeId = info.PipeServer.NodeId();
+ subscriber->NodeId = nodeId;
+ AnswerForSubscriber(subscriptionId, nodeId);
+ }
+ LOG_SQS_DEBUG(GetLogPrefix() << "add subscriber queue_id_number=" << subscriber->QueueIdNumber
+ << " leader_tablet_specified=" << subscriber->SpecifiedLeaderTabletId.has_value()
+ << " tablet_id=" << tabletId
+ << " node_resolved=" << subscriber->NodeId.has_value() << "/" << subscriber->NodeId.value_or(0));
+ info.Subscribers[subscriptionId] = std::move(subscriber);
+ if (info.Subscribers.size() == 1) {
+ LastAccessOfTabletWithoutSubscribers.erase(tabletId);
+ }
+ }
+
+ ui64 TNodeTrackerActor::GetTabletId(const TMap<TKeyPrefix, ui64>& tabletsPerEndKeyRange, TKeyPrefix keyPrefix) const {
+ auto it = tabletsPerEndKeyRange.lower_bound(keyPrefix);
+ Y_VERIFY(it != tabletsPerEndKeyRange.end());
+ return it->second;
+ }
+
+ ui64 TNodeTrackerActor::GetTabletId(const TSubscriberInfo& subscriber) const {
+ if (subscriber.SpecifiedLeaderTabletId) {
+ return subscriber.SpecifiedLeaderTabletId.value();
+ }
+
+ TKeyPrefix keyPrefix;
+ if (subscriber.IsFifo) {
+ keyPrefix = {GetKeysHash(subscriber.QueueIdNumber), subscriber.QueueIdNumber};
+ return GetTabletId(TabletsPerEndKeyRangeFIFO, keyPrefix);
+ }
+
+ keyPrefix = {GetKeysHash(subscriber.QueueIdNumber, 0 /*shard*/), subscriber.QueueIdNumber};
+ return GetTabletId(TabletsPerEndKeyRangeSTD, keyPrefix);
+ }
+
+ void TNodeTrackerActor::MoveSubscribersFromTablet(ui64 tabletId, const NActors::TActorContext& ctx) {
+ auto it = TabletsInfo.find(tabletId);
+ if (it != TabletsInfo.end()) {
+ MoveSubscribersFromTablet(tabletId, it->second, ctx);
+ }
+ }
+
+ void TNodeTrackerActor::MoveSubscribersFromTablet(ui64 tabletId, TTabletInfo& info, const NActors::TActorContext& ctx) {
+ LOG_SQS_DEBUG(GetLogPrefix() << "move subscribers from " << tabletId);
+ if (!info.Subscribers.empty()) {
+ for (auto& [id, subscriber] : info.Subscribers) {
+ SubscriptionsAwaitingPartitionsUpdates[id] = std::move(subscriber);
+ TabletPerSubscriptionId.erase(id);
+ }
+ info.Subscribers.clear();
+ LastAccessOfTabletWithoutSubscribers[tabletId] = ctx.Now();
+ }
+ }
+
+ void TNodeTrackerActor::MoveSubscribersAfterKeyRangeChanged(
+ const TMap<TKeyPrefix, ui64>& current,
+ const TMap<TKeyPrefix, ui64>& actual,
+ const NActors::TActorContext& ctx
+ ) {
+ TKeyPrefix lastCurrent{0, 0};
+ TKeyPrefix lastActual{0, 0};
+
+ auto currentIt = current.begin();
+ auto actualIt = actual.begin();
+ while (currentIt != current.end() && actualIt != actual.end()) {
+ std::pair<TKeyPrefix, TKeyPrefix> intervalC(lastCurrent, currentIt->first);
+ std::pair<TKeyPrefix, TKeyPrefix> intervalA(lastActual, actualIt->first);
+ if (intervalA == intervalC) {
+ if (currentIt->second != actualIt->second) {
+ MoveSubscribersFromTablet(currentIt->second, ctx);
+ }
+ ++currentIt;
+ ++actualIt;
+ } else if (intervalC.second < intervalA.first) { // don't intersect (CCCCC]...(AAAAA]
+ MoveSubscribersFromTablet(currentIt->second, ctx);
+ ++currentIt;
+ } else if (intervalA.second < intervalC.first) { // don't intersect (AAAAA]...(CCCCC]
+ ++actualIt;
+ } else { // intersect
+ MoveSubscribersFromTablet(currentIt->second, ctx);
+ if (intervalC.second <= intervalA.second) {
+ ++currentIt;
+ } else {
+ ++actualIt;
+ }
+ }
+ lastCurrent = intervalC.second;
+ lastActual = intervalA.second;
+ }
+ Y_VERIFY(currentIt == current.end());
+ }
+
+ void TNodeTrackerActor::UpdateKeyRanges(
+ TMap<TKeyPrefix, ui64>& currentTabletsPerEndKeyRange,
+ const NKikimrSchemeOp::TPathDescription& description,
+ const NActors::TActorContext& ctx
+ ) {
+ TMap<TKeyPrefix, ui64> tabletsPerEndKeyRange;
+ for (auto part : description.GetTablePartitions()) {
+ TSerializedCellVec endKeyPrefix(part.GetEndOfRangeKeyPrefix());
+ auto cells = endKeyPrefix.GetCells();
+ auto endKeyRange = GetKeysPrefix(cells);
+ tabletsPerEndKeyRange[endKeyRange] = part.GetDatashardId();
+ }
+
+ MoveSubscribersAfterKeyRangeChanged(currentTabletsPerEndKeyRange, tabletsPerEndKeyRange, ctx);
+ currentTabletsPerEndKeyRange = std::move(tabletsPerEndKeyRange);
+ }
+
+ void TNodeTrackerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const NActors::TActorContext& ctx) {
+ LOG_SQS_DEBUG(GetLogPrefix() << "got tables description.");
+ const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
+ Y_VERIFY(result->ResultSet.size() == 2);
+ for (auto result : result->ResultSet) {
+ if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ DescribeTablesFailed(TStringBuilder() << "describe tables failed : " << result.ToString(), ctx);
+ return;
+ }
+ LOG_SQS_INFO(GetLogPrefix() << "got table description: " << result.ToString());
+ bool isFifo = (result.Path == TablePathFIFO);
+ ui64 pathSubscriptonKey = isFifo ? FIFO_PATH_SUBSCRIPTION_KEY : STD_PATH_SUBSCRIPTION_KEY;
+ Send(SchemeCacheActor, new TEvTxProxySchemeCache::TEvWatchPathId(result.TableId.PathId, pathSubscriptonKey));
+ }
+ DescribeTablesRetyPeriod = DESCRIBE_TABLES_PERIOD_MIN;
+ }
+
+ void TNodeTrackerActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const NActors::TActorContext& ctx) {
+ const auto& describeResult = *ev->Get()->Result;
+ const auto& pathDescription = describeResult.GetPathDescription();
+ LOG_SQS_INFO(GetLogPrefix() << "got actual description for ["
+ << ev->Get()->PathId << " / " << describeResult.GetPath() << "] partitions=" << pathDescription.GetTablePartitions().size());
+
+ bool isFifo = ev->Get()->Key == FIFO_PATH_SUBSCRIPTION_KEY;
+ if (isFifo) {
+ UpdateKeyRanges(TabletsPerEndKeyRangeFIFO, pathDescription, ctx);
+ } else {
+ UpdateKeyRanges(TabletsPerEndKeyRangeSTD, pathDescription, ctx);
+ }
+
+ auto it = SubscriptionsAwaitingPartitionsUpdates.begin();
+ while (it != SubscriptionsAwaitingPartitionsUpdates.end()) {
+ if (SubscriberMustWait(*it->second)) {
+ ++it;
+ } else {
+ AddSubscriber(it->first, std::move(it->second), ctx);
+ auto itToRemove = it;
+ ++it;
+ SubscriptionsAwaitingPartitionsUpdates.erase(itToRemove);
+ }
+ }
+ }
+
+} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/node_tracker.h b/ydb/core/ymq/actor/node_tracker.h
new file mode 100644
index 00000000000..aa4dd941615
--- /dev/null
+++ b/ydb/core/ymq/actor/node_tracker.h
@@ -0,0 +1,121 @@
+#pragma once
+
+#include "events.h"
+#include "log.h"
+
+#include <ydb/core/base/tablet_pipe.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NSQS {
+
+class TNodeTrackerActor : public TActorBootstrapped<TNodeTrackerActor>{
+private:
+ static constexpr TDuration CLEANUP_UNUSED_TABLETS_PERIOD = TDuration::Minutes(10);
+ static constexpr TDuration DESCRIBE_TABLES_PERIOD_MIN = TDuration::Zero();
+ static constexpr TDuration DESCRIBE_TABLES_PERIOD_MAX = TDuration::Seconds(5);
+
+ using TKeyPrefix = std::tuple<ui64, ui64>;
+
+private:
+ struct TSubscriberInfo {
+ using TPtr = std::unique_ptr<TSubscriberInfo>;
+
+ TSubscriberInfo(
+ ui64 queueIdNumber,
+ bool isFifo,
+ std::optional<ui64> specifiedLeaderTabletId,
+ std::optional<ui32> nodeId = std::nullopt
+ );
+
+ const ui64 QueueIdNumber;
+ const bool IsFifo;
+ const std::optional<ui64> SpecifiedLeaderTabletId;
+
+ std::optional<ui32> NodeId;
+ };
+
+ struct TTabletInfo {
+ TActorId PipeClient;
+ TActorId PipeServer;
+ THashMap<ui64, TSubscriberInfo::TPtr> Subscribers;
+ };
+
+public:
+ static const char* GetLogPrefix();
+
+ TNodeTrackerActor(NActors::TActorId schemeCacheActor);
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+ void WorkFunc(TAutoPtr<IEventHandle>& ev, const NActors::TActorContext& ctx) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvWakeup, HandleWakeup);
+ HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleCacheNavigateResponse);
+ HFunc(TSqsEvents::TEvNodeTrackerSubscribeRequest, AddSubscriber);
+ HFunc(TSqsEvents::TEvNodeTrackerUnsubscribeRequest, RemoveSubscriber);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeClientDisconnected);
+ HFunc(TEvTabletPipe::TEvClientConnected, HandlePipeClientConnected);
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
+ default:
+ LOG_SQS_ERROR("Unknown type of event came to SQS node tracker actor: " << ev->Type << " (" << ev->GetBase()->ToString() << "), sender: " << ev->Sender);
+ }
+ }
+
+ void HandleWakeup(TEvWakeup::TPtr&, const NActors::TActorContext& ctx);
+ void ScheduleDescribeTables(TDuration runAfter, const NActors::TActorContext& ctx);
+ void DescribeTablesFailed(const TString& error, const NActors::TActorContext& ctx);
+
+ void HandlePipeClientConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx);
+ void HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext&);
+ void ClosePipeToTablet(TTabletInfo& info);
+ TTabletInfo& ConnectToTablet(ui64 tabletId, bool isReconnect = false);
+ TTabletInfo& ReconnectToTablet(ui64 tabletId);
+ TTabletInfo& GetTabletInfo(ui64 tabletId);
+
+
+ ui64 GetTabletId(const TMap<TKeyPrefix, ui64>& tabletsPerEndKeyRange, TKeyPrefix keyPrefix) const;
+ ui64 GetTabletId(const TSubscriberInfo& subscriber) const;
+
+ void AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId);
+ void RemoveSubscriber(TSqsEvents::TEvNodeTrackerUnsubscribeRequest::TPtr& request, const NActors::TActorContext& ctx);
+ bool SubscriberMustWait(const TSubscriberInfo& subscriber) const;
+ void AddSubscriber(TSqsEvents::TEvNodeTrackerSubscribeRequest::TPtr& request, const NActors::TActorContext& ctx);
+ void AddSubscriber(ui64 subscriptionId, std::unique_ptr<TSubscriberInfo> subscriber, const NActors::TActorContext&);
+ void MoveSubscribersFromTablet(ui64 tabletId, const NActors::TActorContext& ctx);
+ void MoveSubscribersFromTablet(ui64 tabletId, TTabletInfo& info, const NActors::TActorContext& ctx);
+ void MoveSubscribersAfterKeyRangeChanged(
+ const TMap<TKeyPrefix, ui64>& current,
+ const TMap<TKeyPrefix, ui64>& actual,
+ const NActors::TActorContext& ctx
+ );
+
+ void UpdateKeyRanges(
+ TMap<TKeyPrefix, ui64>& currentTabletsPerEndKeyRange,
+ const NKikimrSchemeOp::TPathDescription& description,
+ const NActors::TActorContext& ctx
+ );
+
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const NActors::TActorContext& ctx);
+
+private:
+ TDuration DescribeTablesRetyPeriod = DESCRIBE_TABLES_PERIOD_MIN;
+
+ TMap<TKeyPrefix, ui64> TabletsPerEndKeyRangeSTD;
+ TMap<TKeyPrefix, ui64> TabletsPerEndKeyRangeFIFO;
+
+ THashMap<ui64, TTabletInfo> TabletsInfo;
+ THashMap<ui64, TSubscriberInfo::TPtr> SubscriptionsAwaitingPartitionsUpdates;
+ THashMap<ui64, ui64> TabletPerSubscriptionId;
+ THashMap<ui64, TInstant> LastAccessOfTabletWithoutSubscribers;
+
+ TActorId ParentActor;
+ TActorId SchemeCacheActor;
+
+ const TVector<TString> TablePathSTD;
+ const TVector<TString> TablePathFIFO;
+
+ const TDuration UpdatePeriod;
+};
+
+} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/queues_list_reader.cpp b/ydb/core/ymq/actor/queues_list_reader.cpp
index 2cdbcfe2cea..09168e6d0d3 100644
--- a/ydb/core/ymq/actor/queues_list_reader.cpp
+++ b/ydb/core/ymq/actor/queues_list_reader.cpp
@@ -132,6 +132,7 @@ void TQueuesListReader::OnQueuesList(const TSqsEvents::TEvExecuted::TRecord& rec
rec.ShardsCount = row["Shards"];
rec.DlqName = row["DlqName"];
rec.CreatedTimestamp = TInstant::MilliSeconds(ui64(row["CreatedTimestamp"]));
+ rec.IsFifo = row["FifoQueue"];
}
const bool truncated = val["truncated"];
diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp
index c8fdfe0a915..ecec77eb174 100644
--- a/ydb/core/ymq/actor/service.cpp
+++ b/ydb/core/ymq/actor/service.cpp
@@ -12,6 +12,7 @@
#include "user_settings_names.h"
#include "user_settings_reader.h"
#include "index_events_processor.h"
+#include "node_tracker.h"
#include <ydb/public/lib/value/value.h>
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>
@@ -55,10 +56,12 @@ using NKikimr::NClient::TValue;
const TString LEADER_CREATE_REASON_USER_REQUEST = "UserRequestOnNode";
const TString LEADER_CREATE_REASON_LOCAL_TABLET = "LocalTablet";
const TString LEADER_DESTROY_REASON_LAST_REF = "LastReference";
-const TString LEADER_DESTROY_REASON_TABLET_PIPE_CLOSED = "TabletPipeClosed";
+const TString LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE = "LeaderTabletOnAnotherNode";
+const TString LEADER_DESTROY_REASON_REMOVE_INFO = "RemoveQueueInfo";
constexpr ui64 LIST_USERS_WAKEUP_TAG = 1;
constexpr ui64 LIST_QUEUES_WAKEUP_TAG = 2;
+constexpr ui64 CONNECT_TIMEOUT_TO_LEADER_WAKEUP_TAG = 3;
constexpr size_t EARLY_REQUEST_USERS_LIST_MAX_BUDGET = 10;
constexpr i64 EARLY_REQUEST_QUEUES_LIST_MAX_BUDGET = 5; // per user
@@ -69,7 +72,7 @@ bool IsInternalFolder(const TString& folder) {
struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
TQueueInfo(
- TString userName, TString queueName, TString rootUrl, ui64 leaderTabletId, TString customName,
+ TString userName, TString queueName, TString rootUrl, ui64 leaderTabletId, bool isFifo, TString customName,
TString folderId, ui32 tablesFormat, ui64 version, ui64 shardsCount, const TIntrusivePtr<TUserCounters>& userCounters,
const TIntrusivePtr<TFolderCounters>& folderCounters,
const TActorId& schemeCache, TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> quoterResourcesForUser,
@@ -84,6 +87,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
, ShardsCount_(shardsCount)
, RootUrl_(std::move(rootUrl))
, LeaderTabletId_(leaderTabletId)
+ , IsFifo_(isFifo)
, Counters_(userCounters->CreateQueueCounters(QueueName_, FolderId_, insertCounters))
, UserCounters_(userCounters)
, FolderCounters_(folderCounters)
@@ -92,36 +96,19 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
{
}
- void ConnectToLeaderTablet(bool firstTime = true) {
- if (ConnectingToLeaderTablet_) {
- return;
- }
- ClosePipeToLeaderTablet();
- ConnectingToLeaderTablet_ = true;
- NTabletPipe::TClientConfig cfg;
- cfg.AllowFollower = false;
- cfg.CheckAliveness = true;
- cfg.RetryPolicy = {.RetryLimitCount = 3, .MinRetryTime = TDuration::MilliSeconds(100), .DoFirstRetryInstantly = firstTime};
- PipeClient_ = TActivationContext::Register(NTabletPipe::CreateClient(SelfId(), LeaderTabletId_, cfg));
- LOG_SQS_DEBUG("Connect to leader tablet [" << LeaderTabletId_ << "] for queue [" << UserName_ << "/" << QueueName_ << "]. Pipe client actor: " << PipeClient_);
- }
-
- void SetLeaderPipeServer(const TActorId& pipeServer) {
- LeaderPipeServer_ = pipeServer;
-
- const ui64 nodeId = LeaderPipeServer_.NodeId();
- if (nodeId == SelfId().NodeId()) {
- IncLocalLeaderRef(LEADER_CREATE_REASON_LOCAL_TABLET); // ref for service
- }
+ bool LeaderMustBeOnCurrentNode() const {
+ return LeaderNodeId_ && LeaderNodeId_.value() == SelfId().NodeId();
}
- void ClosePipeToLeaderTablet() {
- if (LeaderPipeServer_.NodeId() == SelfId().NodeId()) {
- DecLocalLeaderRef(LEADER_DESTROY_REASON_TABLET_PIPE_CLOSED); // ref for service
+ void SetLeaderNodeId(ui32 nodeId) {
+ if (LeaderNodeId_ && LeaderNodeId_ == nodeId) {
+ return;
}
- if (PipeClient_) {
- NTabletPipe::CloseClient(SelfId(), PipeClient_);
- PipeClient_ = LeaderPipeServer_ = TActorId();
+ LeaderNodeId_ = nodeId;
+ if (LeaderMustBeOnCurrentNode()) {
+ StartLocalLeader(LEADER_CREATE_REASON_LOCAL_TABLET);
+ } else {
+ StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE);
}
}
@@ -140,6 +127,12 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
}
}
+ void StopLocalLeaderIfNeeded(const TString& reason) {
+ if (!LeaderMustBeOnCurrentNode() && LocalLeaderRefCount_ == 0) {
+ StopLocalLeader(reason);
+ }
+ }
+
void StopLocalLeader(const TString& reason) {
if (LocalLeader_) {
Counters_ = Counters_->GetCountersForNotLeaderNode();
@@ -162,9 +155,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
void DecLocalLeaderRef(const TString& reason) {
Y_VERIFY(LocalLeaderRefCount_ > 0);
--LocalLeaderRefCount_;
- if (LocalLeaderRefCount_ == 0) {
- StopLocalLeader(reason);
- }
+ StopLocalLeaderIfNeeded(reason);
}
TActorIdentity SelfId() const {
@@ -180,20 +171,21 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
ui64 ShardsCount_;
TString RootUrl_;
ui64 LeaderTabletId_ = 0;
+ bool IsFifo_ = false;
TIntrusivePtr<TQueueCounters> Counters_;
TIntrusivePtr<TUserCounters> UserCounters_;
TIntrusivePtr<TFolderCounters> FolderCounters_;
- TActorId PipeClient_;
- TActorId LeaderPipeServer_;
+ std::optional<ui32> LeaderNodeId_;
+ ui64 NodeTrackingSubscriptionId = 0;
+
TActorId LocalLeader_;
TActorId SchemeCache_;
ui64 LocalLeaderRefCount_ = 0;
TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResourcesForUser_;
// State machine
- bool ConnectingToLeaderTablet_ = false;
- TInstant DisconnectedFrom_ = TInstant::Now();
THashSet<TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_;
+ TInstant NodeUnknownSince_ = TInstant::Now();
};
struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> {
@@ -300,6 +292,7 @@ void TSqsService::Bootstrap() {
AggregatedUserCounters_->ShowDetailedCounters(TInstant::Max());
InitSchemeCache();
+ NodeTrackerActor_ = Register(new TNodeTrackerActor(SchemeCache_));
Register(new TUserSettingsReader(AggregatedUserCounters_->GetTransactionCounters()));
QueuesListReader_ = Register(new TQueuesListReader(AggregatedUserCounters_->GetTransactionCounters()));
@@ -341,8 +334,7 @@ STATEFN(TSqsService::StateFunc) {
hFunc(TEvWakeup, HandleWakeup);
hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleDescribeSchemeResult);
hFunc(TSqsEvents::TEvExecuted, HandleExecuted);
- hFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeClientDisconnected);
- hFunc(TEvTabletPipe::TEvClientConnected, HandlePipeClientConnected);
+ hFunc(TSqsEvents::TEvNodeTrackerSubscriptionStatus, HandleNodeTrackingSubscriptionStatus);
hFunc(TSqsEvents::TEvGetConfiguration, HandleGetConfiguration);
hFunc(TSqsEvents::TEvSqsRequest, HandleSqsRequest);
hFunc(TSqsEvents::TEvInsertQueueCounters, HandleInsertQueueCounters);
@@ -436,15 +428,23 @@ void TSqsService::HandleGetLeaderNodeForQueueRequest(TSqsEvents::TEvGetLeaderNod
return;
}
- if (!queueIt->second->LeaderPipeServer_) {
+ auto queuePtr = queueIt->second;
+ if (!queuePtr->LeaderNodeId_) {
LWPROBE(QueueRequestCacheMiss, userName, queueName, reqId, ev->Get()->ToStringHeader());
RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] is waiting for connection to leader tablet.");
- auto& queue = queueIt->second;
- queue->GetLeaderNodeRequests_.emplace(std::move(ev));
+
+ queuePtr->GetLeaderNodeRequests_.emplace(std::move(ev));
+ if (QueuesWithGetNodeWaitingRequests.empty()) {
+ Schedule(
+ TDuration::MilliSeconds(Cfg().GetLeaderConnectTimeoutMs()),
+ new TEvWakeup(CONNECT_TIMEOUT_TO_LEADER_WAKEUP_TAG)
+ );
+ }
+ QueuesWithGetNodeWaitingRequests.insert(queuePtr);
return;
}
- const ui64 nodeId = queueIt->second->LeaderPipeServer_.NodeId();
+ const ui32 nodeId = queuePtr->LeaderNodeId_.value();
RLOG_SQS_REQ_DEBUG(reqId, "Leader node for queue [" << userName << "/" << queueName << "] is " << nodeId);
Send(ev->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(reqId, userName, queueName, nodeId));
}
@@ -737,50 +737,28 @@ TSqsService::TUserInfoPtr TSqsService::GetUserOrWait(TAutoPtr<TEvent>& ev) {
return userIt->second;
}
-void TSqsService::HandlePipeClientConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
- auto queueIt = LeaderTabletIdToQueue_.find(ev->Get()->TabletId);
- if (queueIt == LeaderTabletIdToQueue_.end()) {
- LOG_SQS_WARN("Connected to unknown queue leader. Tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
- return;
- }
- const auto& queue = queueIt->second;
- queue->ConnectingToLeaderTablet_ = false;
-
- if (ev->Get()->Status != NKikimrProto::OK) {
- LOG_SQS_WARN("Failed to connect to queue [" << queue->UserName_ << "/" << queue->QueueName_ << "] leader tablet. Tablet id: [" << ev->Get()->TabletId << "]. Status: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status));
- const TInstant now = TActivationContext::Now();
- const TDuration timeDisconnecned = now - queue->DisconnectedFrom_;
- const TDuration leaderConnectTimeout = TDuration::MilliSeconds(Cfg().GetLeaderConnectTimeoutMs());
- if (timeDisconnecned >= leaderConnectTimeout) {
- for (auto& req : queue->GetLeaderNodeRequests_) {
- RLOG_SQS_REQ_WARN(req->Get()->RequestId, "Can't connect to leader tablet for " << timeDisconnecned);
- Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::FailedToConnectToLeader));
- }
- queue->GetLeaderNodeRequests_.clear();
- }
- queue->ConnectToLeaderTablet(false);
+void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev) {
+ ui64 subscriptionId = ev->Get()->SubscriptionId;
+ auto it = QueuePerNodeTrackingSubscription.find(subscriptionId);
+ if (it == QueuePerNodeTrackingSubscription.end()) {
+ LOG_SQS_WARN("Get node tracking status for unknown subscription id: " << subscriptionId);
+ Send(NodeTrackerActor_, new TSqsEvents::TEvNodeTrackerUnsubscribeRequest(subscriptionId));
return;
}
-
- LOG_SQS_DEBUG("Connected to queue [" << queueIt->second->UserName_ << "/" << queueIt->second->QueueName_ << "] leader. Tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
- queue->SetLeaderPipeServer(ev->Get()->ServerId);
- for (auto& req : queue->GetLeaderNodeRequests_) {
- RLOG_SQS_REQ_DEBUG(req->Get()->RequestId, "Connected to leader tablet. Node id: " << queue->LeaderPipeServer_.NodeId());
- Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, queue->LeaderPipeServer_.NodeId()));
- }
- queue->GetLeaderNodeRequests_.clear();
-}
-
-void TSqsService::HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
- auto queueIt = LeaderTabletIdToQueue_.find(ev->Get()->TabletId);
- if (queueIt != LeaderTabletIdToQueue_.end()) {
- queueIt->second->ConnectingToLeaderTablet_ = false;
- queueIt->second->DisconnectedFrom_ = TActivationContext::Now();
- LOG_SQS_DEBUG("Disconnected from queue [" << queueIt->second->UserName_ << "/" << queueIt->second->QueueName_ << "] leader. Tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
- queueIt->second->ConnectToLeaderTablet(false);
- } else {
- LOG_SQS_WARN("Disconnected from unknown queue leader. Tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
+ auto queuePtr = it->second;
+ auto& queue = *queuePtr;
+ auto nodeId = ev->Get()->NodeId;
+ queue.SetLeaderNodeId(nodeId);
+ LOG_SQS_DEBUG(
+ "Got node leader for queue [" << queue.UserName_ << "/" << queue.QueueName_
+ << "]. Node: " << nodeId << " subscription id: " << subscriptionId
+ );
+ for (auto& req : queue.GetLeaderNodeRequests_) {
+ RLOG_SQS_REQ_DEBUG(req->Get()->RequestId, "Got node leader. Node id: " << nodeId);
+ Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, nodeId));
}
+ queue.GetLeaderNodeRequests_.clear();
+ QueuesWithGetNodeWaitingRequests.erase(queuePtr);
}
void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) {
@@ -798,9 +776,9 @@ void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) {
auto oldListIt = user->Queues_.begin();
while (oldListIt != user->Queues_.end() && newListIt != ev->Get()->SortedQueues.end() && newListIt->UserName == user->UserName_) {
if (oldListIt->first == newListIt->QueueName) { // the same queue
- if (oldListIt->second->LeaderTabletId_ != newListIt->LeaderTabletId) {
- LOG_SQS_WARN("Leader tablet id for queue " << oldListIt->first << " has been changed from "
- << oldListIt->second->LeaderTabletId_ << " to " << newListIt->LeaderTabletId << " (queue was recreated)");
+ if (oldListIt->second->Version_ != newListIt->Version) {
+ LOG_SQS_WARN("Queue version for queue " << oldListIt->first << " has been changed from "
+ << oldListIt->second->Version_ << " to " << newListIt->Version << " (queue was recreated)");
THashSet<TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> oldQueueRequests;
oldQueueRequests.swap(oldListIt->second->GetLeaderNodeRequests_);
@@ -813,9 +791,12 @@ void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) {
newListIt->TablesFormat,
newListIt->Version,
newListIt->ShardsCount,
- newListIt->CreatedTimestamp);
- Y_VERIFY(oldListIt->second->ConnectingToLeaderTablet_);
+ newListIt->CreatedTimestamp,
+ newListIt->IsFifo);
oldQueueRequests.swap(oldListIt->second->GetLeaderNodeRequests_);
+ if (!oldListIt->second->GetLeaderNodeRequests_.empty()) {
+ QueuesWithGetNodeWaitingRequests.insert(oldListIt->second);
+ }
}
++oldListIt;
++newListIt;
@@ -832,7 +813,8 @@ void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) {
newListIt->TablesFormat,
newListIt->Version,
newListIt->ShardsCount,
- newListIt->CreatedTimestamp);
+ newListIt->CreatedTimestamp,
+ newListIt->IsFifo);
++oldListIt;
++newListIt;
}
@@ -851,7 +833,8 @@ void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) {
newListIt->TablesFormat,
newListIt->Version,
newListIt->ShardsCount,
- newListIt->CreatedTimestamp);
+ newListIt->CreatedTimestamp,
+ newListIt->IsFifo);
++newListIt;
}
@@ -1018,13 +1001,12 @@ void TSqsService::RemoveQueue(const TString& userName, const TString& queue) {
}
auto queuePtr = queueIt->second;
- queuePtr->ClosePipeToLeaderTablet();
+ CancleNodeTrackingSubscription(queuePtr);
for (auto& req : queuePtr->GetLeaderNodeRequests_) {
RLOG_SQS_REQ_DEBUG(req->Get()->RequestId, "Removing queue [" << req->Get()->UserName << "/" << req->Get()->QueueName << "] from sqs service info");
Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::NoQueue));
}
queuePtr->GetLeaderNodeRequests_.clear();
- LeaderTabletIdToQueue_.erase(queuePtr->LeaderTabletId_);
userIt->second->QueueByNameAndFolder_.erase(std::make_pair(queuePtr->CustomName_, queuePtr->FolderId_));
auto queuesCount = userIt->second->CountQueuesInFolder(queuePtr->FolderId_);
if (!queuesCount) {
@@ -1042,12 +1024,12 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co
const ui32 tablesFormat,
const ui64 version,
const ui64 shardsCount,
- const TInstant createdTimestamp) {
+ const TInstant createdTimestamp,
+ bool isFifo) {
auto user = MutableUser(userName, false); // don't move requests because they are already moved in our caller
const TInstant now = TActivationContext::Now();
const TInstant timeToInsertCounters = createdTimestamp + TDuration::MilliSeconds(Cfg().GetQueueCountersExportDelayMs());
const bool insertCounters = now >= timeToInsertCounters;
-
auto folderCntrIter = user->FolderCounters_.find(folderId);
if (folderCntrIter == user->FolderCounters_.end()) {
folderCntrIter = user->FolderCounters_.insert(std::make_pair(folderId, user->Counters_->CreateFolderCounters(folderId, true))).first;
@@ -1057,12 +1039,11 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co
}
auto ret = user->Queues_.insert(std::make_pair(queue, TQueueInfoPtr(new TQueueInfo(
- userName, queue, RootUrl_, leaderTabletId, customName, folderId, tablesFormat, version, shardsCount,
+ userName, queue, RootUrl_, leaderTabletId, isFifo, customName, folderId, tablesFormat, version, shardsCount,
user->Counters_, folderCntrIter->second, SchemeCache_, user->QuoterResources_, insertCounters)))
).first;
auto queueInfo = ret->second;
- LeaderTabletIdToQueue_[leaderTabletId] = queueInfo;
user->QueueByNameAndFolder_.emplace(std::make_pair(customName, folderId), queueInfo);
{
@@ -1109,12 +1090,54 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co
}
user->GetQueueFolderIdAndCustomNameRequests_.erase(requests.first, requests.second);
}
-
- queueInfo->ConnectToLeaderTablet();
- LOG_SQS_DEBUG("Created queue record. Queue: [" << queue << "]. Leader tablet id: [" << leaderTabletId << "]. Pipe client actor: " << queueInfo->PipeClient_);
+
+ CreateNodeTrackingSubscription(queueInfo);
+ LOG_SQS_DEBUG("Created queue record. Queue: [" << queue << "]. QueueIdNumber: " << queueInfo->Version_ << ". Leader tablet id: [" << leaderTabletId << "]. Node tracker subscription: " << queueInfo->NodeTrackingSubscriptionId);
return ret;
}
+void TSqsService::CreateNodeTrackingSubscription(TQueueInfoPtr queueInfo) {
+ Y_VERIFY(!queueInfo->NodeTrackingSubscriptionId);
+ queueInfo->NodeTrackingSubscriptionId = ++MaxNodeTrackingSubscriptionId;
+ LOG_SQS_DEBUG("Create node tracking subscription queue_id_number=" << queueInfo->Version_
+ << " tables_format=" << queueInfo->TablesFormat_ << " subscription_id=" << queueInfo->NodeTrackingSubscriptionId
+ );
+
+ QueuePerNodeTrackingSubscription[queueInfo->NodeTrackingSubscriptionId] = queueInfo;
+
+ std::optional<ui64> fixedLeaderTabletId;
+ if (queueInfo->TablesFormat_ == 0) {
+ fixedLeaderTabletId = queueInfo->LeaderTabletId_;
+ }
+ Send(
+ NodeTrackerActor_,
+ new TSqsEvents::TEvNodeTrackerSubscribeRequest(
+ queueInfo->NodeTrackingSubscriptionId,
+ queueInfo->Version_,
+ queueInfo->IsFifo_,
+ fixedLeaderTabletId
+ )
+ );
+}
+
+void TSqsService::CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo) {
+ LOG_SQS_DEBUG("Cancle node tracking subscription queue_id_number=" << queueInfo->Version_
+ << " tables_format=" << queueInfo->TablesFormat_ << " subscription_id=" << queueInfo->NodeTrackingSubscriptionId
+ );
+ Y_VERIFY(queueInfo->NodeTrackingSubscriptionId);
+ auto id = queueInfo->NodeTrackingSubscriptionId;
+ queueInfo->NodeTrackingSubscriptionId = 0;
+
+ QueuePerNodeTrackingSubscription.erase(id);
+ queueInfo->LeaderNodeId_.reset();
+ queueInfo->StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_REMOVE_INFO);
+
+ Send(
+ NodeTrackerActor_,
+ new TSqsEvents::TEvNodeTrackerUnsubscribeRequest(id)
+ );
+}
+
void TSqsService::AnswerNoUserToRequests() {
AnswerNoUserToRequests(GetLeaderNodeRequests_);
AnswerNoUserToRequests(GetConfigurationRequests_);
@@ -1146,6 +1169,32 @@ void TSqsService::AnswerErrorToRequests(const TUserInfoPtr& user) {
AnswerErrorToRequests(user, user->CountQueuesRequests_);
}
+void TSqsService::ProcessConnectTimeoutToLeader() {
+ TDuration nextRunAfter = TDuration::Max();
+ TDuration leaderConnectTimeout = TDuration::MilliSeconds(Cfg().GetLeaderConnectTimeoutMs());
+ auto it = QueuesWithGetNodeWaitingRequests.begin();
+ while(it != QueuesWithGetNodeWaitingRequests.end()) {
+ auto& queue = **it;
+ auto nodeUnknownTime = TActivationContext::Now() - queue.NodeUnknownSince_;
+ auto timeLeft = leaderConnectTimeout - nodeUnknownTime;
+ if (timeLeft == TDuration::Zero()) {
+ for (auto& req : queue.GetLeaderNodeRequests_) {
+ RLOG_SQS_REQ_WARN(req->Get()->RequestId, "Can't connect to leader tablet for " << nodeUnknownTime);
+ Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::FailedToConnectToLeader));
+ }
+ queue.GetLeaderNodeRequests_.clear();
+ auto toRemoveIt = it++;
+ QueuesWithGetNodeWaitingRequests.erase(toRemoveIt);
+ } else {
+ nextRunAfter = Min(nextRunAfter, timeLeft);
+ ++it;
+ }
+ }
+ if (nextRunAfter != TDuration::Max()) {
+ Schedule(nextRunAfter, new TEvWakeup(CONNECT_TIMEOUT_TO_LEADER_WAKEUP_TAG));
+ }
+}
+
void TSqsService::HandleWakeup(TEvWakeup::TPtr& ev) {
Y_VERIFY(ev->Get()->Tag != 0);
switch (ev->Get()->Tag) {
@@ -1167,6 +1216,9 @@ void TSqsService::HandleWakeup(TEvWakeup::TPtr& ev) {
RequestSqsQueuesList();
}
break;
+ case CONNECT_TIMEOUT_TO_LEADER_WAKEUP_TAG:
+ ProcessConnectTimeoutToLeader();
+ break;
}
}
diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h
index 32588a07508..666db5f4be7 100644
--- a/ydb/core/ymq/actor/service.h
+++ b/ydb/core/ymq/actor/service.h
@@ -59,7 +59,11 @@ private:
void HandleInsertQueueCounters(TSqsEvents::TEvInsertQueueCounters::TPtr& ev);
void HandleUserSettingsChanged(TSqsEvents::TEvUserSettingsChanged::TPtr& ev);
void HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev);
+ void HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev);
+ void CreateNodeTrackingSubscription(TQueueInfoPtr queueInfo);
+ void CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo);
+ void ProcessConnectTimeoutToLeader();
void ScheduleRequestSqsUsersList();
void RequestSqsUsersList();
@@ -73,7 +77,7 @@ private:
void RemoveUser(const TString& userName);
std::map<TString, TQueueInfoPtr>::iterator AddQueue(const TString& userName, const TString& queue, ui64 leaderTabletId,
const TString& customName, const TString& folderId, const ui32 tablesFormat, const ui64 version,
- const ui64 shardsCount, const TInstant createdTimestamp);
+ const ui64 shardsCount, const TInstant createdTimestamp, bool isFifo);
void AnswerNoUserToRequests();
void AnswerNoQueueToRequests(const TUserInfoPtr& user);
@@ -138,7 +142,6 @@ private:
std::shared_ptr<TAlignedPagePoolCounters> AllocPoolCounters_;
TIntrusivePtr<TUserCounters> AggregatedUserCounters_;
TUsersMap Users_;
- THashMap<ui64, TQueueInfoPtr> LeaderTabletIdToQueue_;
THashMap<TActorId, TQueueInfoPtr> LocalLeaderRefs_; // referer -> queue info
TActorId SchemeCache_;
TActorId QueuesListReader_;
@@ -159,6 +162,11 @@ private:
THashMultiMap<TString, TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr> GetQueueFolderIdAndCustomNameRequests_; // user name -> request
THashMultiMap<TString, TSqsEvents::TEvCountQueues::TPtr> CountQueuesRequests_; // user name -> request
+ TActorId NodeTrackerActor_;
+ THashMap<ui64, TQueueInfoPtr> QueuePerNodeTrackingSubscription;
+ ui64 MaxNodeTrackingSubscriptionId = 0;
+
+ THashSet<TQueueInfoPtr> QueuesWithGetNodeWaitingRequests;
struct TYcSearchEventsConfig {
TString Database;
diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp
index 73a7fa71cf2..1df5c5a87e7 100644
--- a/ydb/core/ymq/queues/std/queries.cpp
+++ b/ydb/core/ymq/queues/std/queries.cpp
@@ -1307,6 +1307,7 @@ const char* const GetQueuesListQuery = R"__(
'Account
'QueueName
'QueueState
+ 'FifoQueue
'CreatedTimestamp
'CustomQueueName
'DlqName
diff --git a/ydb/tests/functional/sqs/common/test_queues_managing.py b/ydb/tests/functional/sqs/common/test_queues_managing.py
index 75d265ed0eb..3f54b000184 100644
--- a/ydb/tests/functional/sqs/common/test_queues_managing.py
+++ b/ydb/tests/functional/sqs/common/test_queues_managing.py
@@ -260,7 +260,7 @@ class QueuesManagingTest(KikimrSqsTestBase):
master_is_updated = True
break
except RuntimeError as ex:
- assert str(ex).find('master session error') != -1
+ assert str(ex).find('master session error') != -1 or str(ex).find('failed because of an unknown error, exception or failure') != -1
time.sleep(0.5) # wait master update time
assert_that(master_is_updated)
diff --git a/ydb/tests/functional/sqs/merge_split_common_table/__init__.py b/ydb/tests/functional/sqs/merge_split_common_table/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/ydb/tests/functional/sqs/merge_split_common_table/__init__.py
diff --git a/ydb/tests/functional/sqs/merge_split_common_table/fifo/test.py b/ydb/tests/functional/sqs/merge_split_common_table/fifo/test.py
new file mode 100644
index 00000000000..a1a9d72039b
--- /dev/null
+++ b/ydb/tests/functional/sqs/merge_split_common_table/fifo/test.py
@@ -0,0 +1,6 @@
+from ydb.tests.functional.sqs.merge_split_common_table.test import TestSqsSplitMergeTables
+
+
+class TestSqsSplitMergeFifoTables(TestSqsSplitMergeTables):
+ def test_fifo_merge_split(self):
+ self.run_test(is_fifo=True)
diff --git a/ydb/tests/functional/sqs/merge_split_common_table/std/test.py b/ydb/tests/functional/sqs/merge_split_common_table/std/test.py
new file mode 100644
index 00000000000..3bbedf0df96
--- /dev/null
+++ b/ydb/tests/functional/sqs/merge_split_common_table/std/test.py
@@ -0,0 +1,6 @@
+from ydb.tests.functional.sqs.merge_split_common_table.test import TestSqsSplitMergeTables
+
+
+class TestSqsSplitMergeStdTables(TestSqsSplitMergeTables):
+ def test_std_merge_split(self):
+ self.run_test(is_fifo=False)
diff --git a/ydb/tests/functional/sqs/merge_split_common_table/test.py b/ydb/tests/functional/sqs/merge_split_common_table/test.py
new file mode 100644
index 00000000000..fd7dd064780
--- /dev/null
+++ b/ydb/tests/functional/sqs/merge_split_common_table/test.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import time
+
+import ydb
+import random
+import string
+
+from ydb.tests.library.common.types import Erasure
+from ydb.tests.library.sqs.test_base import KikimrSqsTestBase
+
+
+def random_string(length):
+ return ''.join([random.choice(string.ascii_lowercase) for _ in range(length)])
+
+
+class TestSqsSplitMergeTables(KikimrSqsTestBase):
+ erasure = Erasure.BLOCK_4_2
+ use_in_memory_pdisks = False
+
+ @classmethod
+ def _setup_config_generator(cls):
+ config_generator = super(TestSqsSplitMergeTables, cls)._setup_config_generator()
+ config_generator.yaml_config['sqs_config']['masters_describer_update_time_ms'] = 1000
+ config_generator.yaml_config['sqs_config']['background_metrics_update_time_ms'] = 1000
+ return config_generator
+
+ def get_leaders_per_nodes(self):
+ nodes = len(self.cluster.nodes)
+ leaders = []
+ for node_index in range(nodes):
+ counters = self._get_counters(node_index, 'utils', counters_format='json')
+ labels = {
+ 'activity': 'SQS_QUEUE_LEADER_ACTOR',
+ 'sensor': 'ActorsAliveByActivity'
+ }
+ leader_actors = self._get_counter(counters, labels)
+ leaders.append(leader_actors['value'] if leader_actors else 0)
+ return leaders
+
+ def wait_leader_started(self, queues_count):
+ while True:
+ leaders_per_node = self.get_leaders_per_nodes()
+ logging.debug(f'wait all leaders {sum(leaders_per_node)} vs {queues_count} : {leaders_per_node}')
+ if sum(leaders_per_node) == queues_count:
+ return
+ time.sleep(1)
+
+ def alter_table(self, table_path):
+ logging.info(f'alter table {table_path}...')
+ settings = ydb.RetrySettings()
+ settings.max_retries = 1
+ session = ydb.retry_operation_sync(lambda: self._driver.table_client.session().create(), retry_settings=settings)
+
+ self.__column_to_force_split = 'column_for_tests'
+ ydb.retry_operation_sync(lambda: session.alter_table(
+ table_path,
+ alter_partitioning_settings=ydb.PartitioningSettings()
+ .with_min_partitions_count(1)
+ .with_partition_size_mb(1)
+ .with_partitioning_by_size(ydb.FeatureFlag.ENABLED)
+ .with_partitioning_by_load(ydb.FeatureFlag.ENABLED),
+ add_columns=(
+ ydb.Column(
+ self.__column_to_force_split,
+ ydb.OptionalType(ydb.PrimitiveType.String),
+ ),
+ ),
+ ))
+
+ def force_split(self, table_path):
+ logging.info(f'force split {table_path}...')
+ settings = ydb.RetrySettings()
+ settings.max_retries = 1
+ session = ydb.retry_operation_sync(lambda: self._driver.table_client.session().create(), retry_settings=settings)
+ session.transaction().execute(f'update `{table_path}` SET {self.__column_to_force_split}="{random_string(5*1024*1024)}"', commit_tx=True)
+
+ def get_nodes_with_leaders(self):
+ leaders_per_node = self.get_leaders_per_nodes()
+ return len(list(filter(bool, leaders_per_node)))
+
+ def send_messages(self, is_fifo, queue_urls, messages_count=1, message_length=16):
+ group_id = 'group' if is_fifo else None
+ for i in range(messages_count):
+ for queue_url in queue_urls:
+ self.seq_no += 1
+ self._send_message_and_assert(queue_url, random_string(message_length), seq_no=self.seq_no if is_fifo else None, group_id=group_id)
+
+ def run_test(self, is_fifo):
+ self._init_with_params(is_fifo, tables_format=1)
+ queues_count = 25
+ queue_urls = []
+
+ for index in range(queues_count):
+ queue_name = f'q_{index}_{self.queue_name}'
+ queue_urls.append(self._create_queue_and_assert(queue_name, is_fifo=is_fifo))
+ self.wait_leader_started(queues_count)
+ logging.info('all leaders started.')
+ assert self.get_nodes_with_leaders() > 1
+
+ self.send_messages(is_fifo, queue_urls)
+ logging.info('messages have been sent #1')
+
+ balancing_table_path = '/Root/SQS/.' + ('FIFO' if is_fifo else 'STD') + '/Messages'
+ self.alter_table(balancing_table_path)
+
+ self.send_messages(is_fifo, queue_urls)
+ logging.info('messages have been sent #2')
+
+ while True:
+ time.sleep(1)
+ leaders_per_node = self.get_leaders_per_nodes()
+ nodes_with_leaders = len(list(filter(bool, leaders_per_node)))
+ logging.debug(f'wait merge... nodes_with_leaders={nodes_with_leaders} all_leaders={sum(leaders_per_node)} : {leaders_per_node}')
+ if nodes_with_leaders == 1 and sum(leaders_per_node) == queues_count:
+ break
+
+ logging.info('all leaders on 1 node')
+
+ self.force_split(balancing_table_path)
+ self.send_messages(is_fifo, queue_urls, messages_count=2)
+ logging.info('messages have been sent #3')
+
+ while True:
+ time.sleep(1)
+ leaders_per_node = self.get_leaders_per_nodes()
+ nodes_with_leaders = len(list(filter(bool, leaders_per_node)))
+ logging.info(f'wait split... nodes_with_leaders={nodes_with_leaders} all_leaders={sum(leaders_per_node)} : {leaders_per_node}')
+ if nodes_with_leaders > 1 and sum(leaders_per_node) == queues_count:
+ break
+ logging.info(f'test finished. Leaders pre node : {leaders_per_node}')
diff --git a/ydb/tests/library/sqs/tables.py b/ydb/tests/library/sqs/tables.py
index 0381e810fab..e212e534881 100644
--- a/ydb/tests/library/sqs/tables.py
+++ b/ydb/tests/library/sqs/tables.py
@@ -24,21 +24,28 @@ def _create_table(root, session, table_name, columns, keys_count, queue_type=Non
table_path = get_table_path(root, table_name, queue_type)
keys = [name for name, _ in columns[:keys_count]]
columns = [ydb.Column(name, ydb.OptionalType(column_type)) for name, column_type in columns]
- ydb.retry_operation_sync(lambda: session.create_table(
- table_path,
- ydb.TableDescription()
- .with_primary_keys(*keys)
- .with_columns(*columns)
- .with_profile(
- ydb.TableProfile()
- .with_partitioning_policy(
- ydb.PartitioningPolicy()
- .with_auto_partitioning(
- ydb.AutoPartitioningPolicy.AUTO_SPLIT
- )
+
+ if queue_type:
+ ydb.retry_operation_sync(lambda: session.create_table(
+ table_path,
+ ydb.TableDescription()
+ .with_primary_keys(*keys)
+ .with_columns(*columns)
+ .with_uniform_partitions(10)
+ .with_partitioning_settings(
+ ydb.PartitioningSettings()
+ .with_min_partitions_count(10)
+ .with_partitioning_by_size(ydb.FeatureFlag.ENABLED)
+ .with_partitioning_by_load(ydb.FeatureFlag.ENABLED)
)
- )
- ))
+ ))
+ else:
+ ydb.retry_operation_sync(lambda: session.create_table(
+ table_path,
+ ydb.TableDescription()
+ .with_primary_keys(*keys)
+ .with_columns(*columns)
+ ))
def get_table_keys_for_queue(with_shard=False):
diff --git a/ydb/tests/library/sqs/test_base.py b/ydb/tests/library/sqs/test_base.py
index ba292399e58..9f78bde4828 100644
--- a/ydb/tests/library/sqs/test_base.py
+++ b/ydb/tests/library/sqs/test_base.py
@@ -259,7 +259,7 @@ class KikimrSqsTestBase(object):
config_generator = KikimrConfigGenerator(
erasure=cls.erasure,
use_in_memory_pdisks=cls.use_in_memory_pdisks,
- additional_log_configs={'SQS': LogLevels.INFO},
+ additional_log_configs={'SQS': LogLevels.DEBUG},
enable_sqs=True,
)
config_generator.yaml_config['sqs_config']['root'] = cls.sqs_root