diff options
author | alexbogo <alexbogo@ydb.tech> | 2022-07-14 16:15:10 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2022-07-14 16:15:10 +0300 |
commit | e49b95a764c9bc6ecae8c27be33e68a226036049 (patch) | |
tree | c087f5b6676df5f0591d4d634c84577d7b7d16aa | |
parent | f9a769cdfc6999a2fe5d0da2e20c40af02bbb5f4 (diff) | |
download | ydb-e49b95a764c9bc6ecae8c27be33e68a226036049.tar.gz |
[sqs] balancing leaders
init
-rw-r--r-- | ydb/core/ymq/actor/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/ymq/actor/events.h | 43 | ||||
-rw-r--r-- | ydb/core/ymq/actor/node_tracker.cpp | 394 | ||||
-rw-r--r-- | ydb/core/ymq/actor/node_tracker.h | 121 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queues_list_reader.cpp | 1 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.cpp | 250 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.h | 12 | ||||
-rw-r--r-- | ydb/core/ymq/queues/std/queries.cpp | 1 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/common/test_queues_managing.py | 2 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/merge_split_common_table/__init__.py | 0 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/merge_split_common_table/fifo/test.py | 6 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/merge_split_common_table/std/test.py | 6 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/merge_split_common_table/test.py | 132 | ||||
-rw-r--r-- | ydb/tests/library/sqs/tables.py | 35 | ||||
-rw-r--r-- | ydb/tests/library/sqs/test_base.py | 2 |
15 files changed, 889 insertions, 117 deletions
diff --git a/ydb/core/ymq/actor/CMakeLists.txt b/ydb/core/ymq/actor/CMakeLists.txt index 329ab01f408..623ced76fe8 100644 --- a/ydb/core/ymq/actor/CMakeLists.txt +++ b/ydb/core/ymq/actor/CMakeLists.txt @@ -84,6 +84,7 @@ target_sources(core-ymq-actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/metering.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/migration.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/modify_permissions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/node_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/proxy_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/purge.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ymq/actor/purge_queue.cpp diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h index 71bfaf13e7e..b0e631b6e21 100644 --- a/ydb/core/ymq/actor/events.h +++ b/ydb/core/ymq/actor/events.h @@ -126,6 +126,10 @@ struct TSqsEvents { EvCleanupQueryComplete, + EvNodeTrackerSubscribeRequest, + EvNodeTrackerUnsubscribeRequest, + EvNodeTrackerSubscriptionStatus, + EvEnd, }; @@ -801,6 +805,7 @@ struct TSqsEvents { ui64 Version = 0; ui64 ShardsCount = 0; TInstant CreatedTimestamp; + bool IsFifo = false; bool operator<(const TQueueRecord& r) const { return std::tie(UserName, QueueName) < std::tie(r.UserName, r.QueueName); @@ -890,6 +895,44 @@ struct TSqsEvents { TString Name; ui64 Type; }; + + struct TEvNodeTrackerSubscribeRequest + : public NActors::TEventLocal<TEvNodeTrackerSubscribeRequest, EvNodeTrackerSubscribeRequest> + { + explicit TEvNodeTrackerSubscribeRequest( + ui64 subscriptionId, + ui64 queueIdNumber, + bool isFifo, + std::optional<ui64> tabletId = {} + ) + : SubscriptionId(subscriptionId) + , QueueIdNumber(queueIdNumber) + , IsFifo(isFifo) + , TabletId(tabletId) + {} + ui64 SubscriptionId; + ui64 QueueIdNumber; + bool IsFifo; + std::optional<ui64> TabletId; + }; + + struct TEvNodeTrackerUnsubscribeRequest + : public NActors::TEventLocal<TEvNodeTrackerUnsubscribeRequest, EvNodeTrackerUnsubscribeRequest> + { + TEvNodeTrackerUnsubscribeRequest(ui64 subscriptionId) + : SubscriptionId(subscriptionId) + {} + ui64 SubscriptionId; + }; + + struct TEvNodeTrackerSubscriptionStatus : public NActors::TEventLocal<TEvNodeTrackerSubscriptionStatus, EvNodeTrackerSubscriptionStatus> { + explicit TEvNodeTrackerSubscriptionStatus(ui64 subscriptionId, ui32 nodeId) + : SubscriptionId(subscriptionId) + , NodeId(nodeId) + {} + ui64 SubscriptionId; + ui32 NodeId; + }; }; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/node_tracker.cpp b/ydb/core/ymq/actor/node_tracker.cpp new file mode 100644 index 00000000000..eab06da69ae --- /dev/null +++ b/ydb/core/ymq/actor/node_tracker.cpp @@ -0,0 +1,394 @@ +#include "node_tracker.h"
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/ymq/actor/cfg.h>
+#include <ydb/core/ymq/actor/serviceid.h>
+#include <ydb/core/ymq/queues/common/key_hashes.h>
+
+#include <util/string/vector.h>
+
+namespace {
+ ui64 GetKeyCellValue(const NKikimr::TCell& cell) {
+ return cell.IsNull() ? Max<ui64>() : cell.AsValue<ui64>();
+ }
+
+ std::tuple<ui64, ui64> GetKeysPrefix(const TConstArrayRef<NKikimr::TCell>& cells) {
+ if (cells.empty()) {
+ return {Max<ui64>(), Max<ui64>()};
+ }
+ return {GetKeyCellValue(cells[0]), GetKeyCellValue(cells[1])};
+ }
+
+ constexpr ui64 STD_PATH_SUBSCRIPTION_KEY = 1;
+ constexpr ui64 FIFO_PATH_SUBSCRIPTION_KEY = 2;
+}
+
+namespace NKikimr::NSQS {
+ TNodeTrackerActor::TSubscriberInfo::TSubscriberInfo(
+ ui64 queueIdNumber,
+ bool isFifo,
+ std::optional<ui64> specifiedLeaderTabletId,
+ std::optional<ui32> nodeId
+ )
+ : QueueIdNumber(queueIdNumber)
+ , IsFifo(isFifo)
+ , SpecifiedLeaderTabletId(specifiedLeaderTabletId)
+ , NodeId(nodeId)
+ {}
+
+ const char* TNodeTrackerActor::GetLogPrefix() {
+ return "[Node tracker] ";
+ }
+
+ TNodeTrackerActor::TNodeTrackerActor(NActors::TActorId schemeCacheActor)
+ : SchemeCacheActor(schemeCacheActor)
+ , TablePathSTD(NKikimr::SplitPath(Cfg().GetRoot() + "/.STD/Messages"))
+ , TablePathFIFO(NKikimr::SplitPath(Cfg().GetRoot() + "/.FIFO/Messages"))
+ {
+ }
+
+ void TNodeTrackerActor::Bootstrap(const NActors::TActorContext& ctx) {
+ ParentActor = MakeSqsServiceID(SelfId().NodeId());
+ Become(&TNodeTrackerActor::WorkFunc);
+ ScheduleDescribeTables(TDuration::Zero(), ctx);
+ Schedule(CLEANUP_UNUSED_TABLETS_PERIOD, new TEvents::TEvWakeup());
+ LOG_SQS_DEBUG(GetLogPrefix() << "bootstrap on node=" << SelfId().NodeId());
+ }
+
+ void TNodeTrackerActor::HandleWakeup(TEvWakeup::TPtr&, const NActors::TActorContext& ctx) {
+ // removed unused tablets
+ for (auto it = LastAccessOfTabletWithoutSubscribers.begin(); it != LastAccessOfTabletWithoutSubscribers.end();) {
+ ui64 tabletId = it->first;
+ TInstant lastAccess = it->second;
+ auto currentIt = it++;
+ if (ctx.Now() - lastAccess >= CLEANUP_UNUSED_TABLETS_PERIOD) {
+ auto infoIt = TabletsInfo.find(tabletId);
+ if (infoIt != TabletsInfo.end()) {
+ ClosePipeToTablet(infoIt->second);
+ TabletsInfo.erase(infoIt);
+ LastAccessOfTabletWithoutSubscribers.erase(currentIt);
+ } else {
+ LOG_SQS_ERROR(GetLogPrefix() << "unknown tabletId=" << tabletId << " with last access at " << lastAccess << " in unused tablets cleanup");
+ }
+ }
+ }
+
+ Schedule(CLEANUP_UNUSED_TABLETS_PERIOD, new TEvents::TEvWakeup());
+ }
+
+ void TNodeTrackerActor::ScheduleDescribeTables(TDuration runAfter, const NActors::TActorContext& ctx) {
+ LOG_SQS_NOTICE(GetLogPrefix() << "schedule describe tables after " << runAfter);
+ auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
+ //navigateRequest->DatabaseName = Cfg().GetRoot();
+
+ navigateRequest->ResultSet.resize(2);
+ navigateRequest->ResultSet.front().Path = TablePathSTD;
+ navigateRequest->ResultSet.back().Path = TablePathFIFO;
+ ctx.ExecutorThread.ActorSystem->Schedule(
+ runAfter,
+ new IEventHandle(
+ SchemeCacheActor,
+ SelfId(),
+ new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release())
+ )
+ );
+ }
+
+ void TNodeTrackerActor::DescribeTablesFailed(const TString& error, const NActors::TActorContext& ctx) {
+ LOG_SQS_ERROR(GetLogPrefix() << "describe tables failed: " << error);
+ DescribeTablesRetyPeriod = Min(
+ DESCRIBE_TABLES_PERIOD_MAX,
+ 2 * Max(DescribeTablesRetyPeriod, TDuration::MilliSeconds(100))
+ );
+ ScheduleDescribeTables(DescribeTablesRetyPeriod, ctx);
+ }
+
+ void TNodeTrackerActor::HandlePipeClientConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) {
+ ui64 tabletId = ev->Get()->TabletId;
+ auto it = TabletsInfo.find(tabletId);
+ if (it == TabletsInfo.end()) {
+ LOG_SQS_WARN(GetLogPrefix() << "connected to unrequired tablet. Tablet id: [" << tabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
+ return;
+ }
+
+ auto& info = it->second;
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ LOG_SQS_WARN(GetLogPrefix() << "failed to connect to tablet " << tabletId << " dead=" << ev->Get()->Dead);
+
+ if (ev->Get()->Dead) {
+ MoveSubscribersFromTablet(tabletId, info, ctx);
+ return;
+ }
+ ReconnectToTablet(tabletId);
+ return;
+ }
+
+ LOG_SQS_DEBUG(GetLogPrefix() << "connected to tabletId [" << tabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
+ info.PipeServer = ev->Get()->ServerId;
+ ui32 nodeId = info.PipeServer.NodeId();
+ for (auto& [id, subscriber] : info.Subscribers) {
+ if (!subscriber->NodeId || subscriber->NodeId.value() != nodeId) {
+ subscriber->NodeId = nodeId;
+ AnswerForSubscriber(id, nodeId);
+ }
+ }
+ }
+
+ void TNodeTrackerActor::AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId) {
+ Send(ParentActor, new TSqsEvents::TEvNodeTrackerSubscriptionStatus(subscriptionId, nodeId));
+ }
+
+ void TNodeTrackerActor::HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext&) {
+ auto it = TabletsInfo.find(ev->Get()->TabletId);
+ if (it != TabletsInfo.end()) {
+ LOG_SQS_DEBUG(GetLogPrefix() << "tablet pipe " << ev->Get()->TabletId << " disconnected");
+ ReconnectToTablet(ev->Get()->TabletId);
+ } else {
+ LOG_SQS_WARN(GetLogPrefix() << " disconnected from unrequired tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
+ }
+ }
+
+ void TNodeTrackerActor::ClosePipeToTablet(TTabletInfo& info) {
+ if (info.PipeClient) {
+ NTabletPipe::CloseClient(SelfId(), info.PipeClient);
+ info.PipeClient = info.PipeServer = TActorId();
+ }
+ }
+
+ TNodeTrackerActor::TTabletInfo& TNodeTrackerActor::ConnectToTablet(ui64 tabletId, bool isReconnect) {
+ LOG_SQS_DEBUG(GetLogPrefix() << "connect to tablet " << tabletId << " is_reconnect=" << isReconnect);
+ NTabletPipe::TClientConfig cfg;
+ cfg.AllowFollower = false;
+ cfg.CheckAliveness = true;
+ cfg.RetryPolicy = {.RetryLimitCount = 3, .MinRetryTime = TDuration::MilliSeconds(100), .DoFirstRetryInstantly = !isReconnect};
+
+ auto& info = TabletsInfo[tabletId];
+ ClosePipeToTablet(info);
+ info.PipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, cfg));
+ return info;
+ }
+
+ TNodeTrackerActor::TTabletInfo& TNodeTrackerActor::ReconnectToTablet(ui64 tabletId) {
+ return ConnectToTablet(tabletId, true);
+ }
+
+ TNodeTrackerActor::TTabletInfo& TNodeTrackerActor::GetTabletInfo(ui64 tabletId) {
+ auto it = TabletsInfo.find(tabletId);
+ if (it == TabletsInfo.end()) {
+ return ConnectToTablet(tabletId);
+ }
+ return it->second;
+ }
+
+ void TNodeTrackerActor::RemoveSubscriber(TSqsEvents::TEvNodeTrackerUnsubscribeRequest::TPtr& request, const NActors::TActorContext& ctx) {
+ ui64 subscriptionId = request->Get()->SubscriptionId;
+ LOG_SQS_DEBUG(GetLogPrefix() << "remove subscriber with id=" << subscriptionId);
+ auto it = TabletPerSubscriptionId.find(subscriptionId);
+ if (it != TabletPerSubscriptionId.end()) {
+ auto tabletIt = TabletsInfo.find(it->second);
+ if (tabletIt != TabletsInfo.end()) {
+ auto& info = tabletIt->second;
+ info.Subscribers.erase(subscriptionId);
+ if (info.Subscribers.empty()) {
+ LastAccessOfTabletWithoutSubscribers[tabletIt->first] = ctx.Now();
+ }
+ } else {
+ LOG_SQS_WARN("Node tracker removing subscription " << subscriptionId << "that is not found in the tablet information " << tabletIt->first);
+ }
+ TabletPerSubscriptionId.erase(it);
+ } else {
+ auto subscriptionIt = SubscriptionsAwaitingPartitionsUpdates.find(subscriptionId);
+ if (subscriptionIt != SubscriptionsAwaitingPartitionsUpdates.end()) {
+ SubscriptionsAwaitingPartitionsUpdates.erase(subscriptionIt);
+ } else {
+ LOG_SQS_WARN("Node tracker removing unknown subscription " << subscriptionId);
+ }
+ }
+ }
+
+ bool TNodeTrackerActor::SubscriberMustWait(const TSubscriberInfo& subscriber) const {
+ if (subscriber.SpecifiedLeaderTabletId) {
+ return false;
+ }
+ // partitions of the common table have not yet been received
+ if (subscriber.IsFifo) {
+ return TabletsPerEndKeyRangeFIFO.empty();
+ }
+ return TabletsPerEndKeyRangeSTD.empty();
+ }
+
+ void TNodeTrackerActor::AddSubscriber(TSqsEvents::TEvNodeTrackerSubscribeRequest::TPtr& request, const NActors::TActorContext& ctx) {
+ auto& req = *request->Get();
+ LOG_SQS_DEBUG(GetLogPrefix() << "add subscriber on init with id=" << req.SubscriptionId
+ << " queue_id_number=" << req.QueueIdNumber << " is_fifo=" << req.IsFifo
+ << " tablet_id=" << req.TabletId.value_or(0)
+ );
+ auto subscriber = std::make_unique<TSubscriberInfo>(req.QueueIdNumber, req.IsFifo, req.TabletId);
+
+ if (SubscriberMustWait(*subscriber)) {
+ SubscriptionsAwaitingPartitionsUpdates[req.SubscriptionId] = std::move(subscriber);
+ return;
+ }
+
+ AddSubscriber(req.SubscriptionId, std::move(subscriber), ctx);
+ }
+
+ void TNodeTrackerActor::AddSubscriber(ui64 subscriptionId, std::unique_ptr<TSubscriberInfo> subscriber, const NActors::TActorContext&) {
+ ui64 tabletId = GetTabletId(*subscriber);
+ TabletPerSubscriptionId[subscriptionId] = tabletId;
+ TTabletInfo& info = GetTabletInfo(tabletId);
+
+ if (info.PipeServer) {
+ ui32 nodeId = info.PipeServer.NodeId();
+ subscriber->NodeId = nodeId;
+ AnswerForSubscriber(subscriptionId, nodeId);
+ }
+ LOG_SQS_DEBUG(GetLogPrefix() << "add subscriber queue_id_number=" << subscriber->QueueIdNumber
+ << " leader_tablet_specified=" << subscriber->SpecifiedLeaderTabletId.has_value()
+ << " tablet_id=" << tabletId
+ << " node_resolved=" << subscriber->NodeId.has_value() << "/" << subscriber->NodeId.value_or(0));
+ info.Subscribers[subscriptionId] = std::move(subscriber);
+ if (info.Subscribers.size() == 1) {
+ LastAccessOfTabletWithoutSubscribers.erase(tabletId);
+ }
+ }
+
+ ui64 TNodeTrackerActor::GetTabletId(const TMap<TKeyPrefix, ui64>& tabletsPerEndKeyRange, TKeyPrefix keyPrefix) const {
+ auto it = tabletsPerEndKeyRange.lower_bound(keyPrefix);
+ Y_VERIFY(it != tabletsPerEndKeyRange.end());
+ return it->second;
+ }
+
+ ui64 TNodeTrackerActor::GetTabletId(const TSubscriberInfo& subscriber) const {
+ if (subscriber.SpecifiedLeaderTabletId) {
+ return subscriber.SpecifiedLeaderTabletId.value();
+ }
+
+ TKeyPrefix keyPrefix;
+ if (subscriber.IsFifo) {
+ keyPrefix = {GetKeysHash(subscriber.QueueIdNumber), subscriber.QueueIdNumber};
+ return GetTabletId(TabletsPerEndKeyRangeFIFO, keyPrefix);
+ }
+
+ keyPrefix = {GetKeysHash(subscriber.QueueIdNumber, 0 /*shard*/), subscriber.QueueIdNumber};
+ return GetTabletId(TabletsPerEndKeyRangeSTD, keyPrefix);
+ }
+
+ void TNodeTrackerActor::MoveSubscribersFromTablet(ui64 tabletId, const NActors::TActorContext& ctx) {
+ auto it = TabletsInfo.find(tabletId);
+ if (it != TabletsInfo.end()) {
+ MoveSubscribersFromTablet(tabletId, it->second, ctx);
+ }
+ }
+
+ void TNodeTrackerActor::MoveSubscribersFromTablet(ui64 tabletId, TTabletInfo& info, const NActors::TActorContext& ctx) {
+ LOG_SQS_DEBUG(GetLogPrefix() << "move subscribers from " << tabletId);
+ if (!info.Subscribers.empty()) {
+ for (auto& [id, subscriber] : info.Subscribers) {
+ SubscriptionsAwaitingPartitionsUpdates[id] = std::move(subscriber);
+ TabletPerSubscriptionId.erase(id);
+ }
+ info.Subscribers.clear();
+ LastAccessOfTabletWithoutSubscribers[tabletId] = ctx.Now();
+ }
+ }
+
+ void TNodeTrackerActor::MoveSubscribersAfterKeyRangeChanged(
+ const TMap<TKeyPrefix, ui64>& current,
+ const TMap<TKeyPrefix, ui64>& actual,
+ const NActors::TActorContext& ctx
+ ) {
+ TKeyPrefix lastCurrent{0, 0};
+ TKeyPrefix lastActual{0, 0};
+
+ auto currentIt = current.begin();
+ auto actualIt = actual.begin();
+ while (currentIt != current.end() && actualIt != actual.end()) {
+ std::pair<TKeyPrefix, TKeyPrefix> intervalC(lastCurrent, currentIt->first);
+ std::pair<TKeyPrefix, TKeyPrefix> intervalA(lastActual, actualIt->first);
+ if (intervalA == intervalC) {
+ if (currentIt->second != actualIt->second) {
+ MoveSubscribersFromTablet(currentIt->second, ctx);
+ }
+ ++currentIt;
+ ++actualIt;
+ } else if (intervalC.second < intervalA.first) { // don't intersect (CCCCC]...(AAAAA]
+ MoveSubscribersFromTablet(currentIt->second, ctx);
+ ++currentIt;
+ } else if (intervalA.second < intervalC.first) { // don't intersect (AAAAA]...(CCCCC]
+ ++actualIt;
+ } else { // intersect
+ MoveSubscribersFromTablet(currentIt->second, ctx);
+ if (intervalC.second <= intervalA.second) {
+ ++currentIt;
+ } else {
+ ++actualIt;
+ }
+ }
+ lastCurrent = intervalC.second;
+ lastActual = intervalA.second;
+ }
+ Y_VERIFY(currentIt == current.end());
+ }
+
+ void TNodeTrackerActor::UpdateKeyRanges(
+ TMap<TKeyPrefix, ui64>& currentTabletsPerEndKeyRange,
+ const NKikimrSchemeOp::TPathDescription& description,
+ const NActors::TActorContext& ctx
+ ) {
+ TMap<TKeyPrefix, ui64> tabletsPerEndKeyRange;
+ for (auto part : description.GetTablePartitions()) {
+ TSerializedCellVec endKeyPrefix(part.GetEndOfRangeKeyPrefix());
+ auto cells = endKeyPrefix.GetCells();
+ auto endKeyRange = GetKeysPrefix(cells);
+ tabletsPerEndKeyRange[endKeyRange] = part.GetDatashardId();
+ }
+
+ MoveSubscribersAfterKeyRangeChanged(currentTabletsPerEndKeyRange, tabletsPerEndKeyRange, ctx);
+ currentTabletsPerEndKeyRange = std::move(tabletsPerEndKeyRange);
+ }
+
+ void TNodeTrackerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const NActors::TActorContext& ctx) {
+ LOG_SQS_DEBUG(GetLogPrefix() << "got tables description.");
+ const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
+ Y_VERIFY(result->ResultSet.size() == 2);
+ for (auto result : result->ResultSet) {
+ if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ DescribeTablesFailed(TStringBuilder() << "describe tables failed : " << result.ToString(), ctx);
+ return;
+ }
+ LOG_SQS_INFO(GetLogPrefix() << "got table description: " << result.ToString());
+ bool isFifo = (result.Path == TablePathFIFO);
+ ui64 pathSubscriptonKey = isFifo ? FIFO_PATH_SUBSCRIPTION_KEY : STD_PATH_SUBSCRIPTION_KEY;
+ Send(SchemeCacheActor, new TEvTxProxySchemeCache::TEvWatchPathId(result.TableId.PathId, pathSubscriptonKey));
+ }
+ DescribeTablesRetyPeriod = DESCRIBE_TABLES_PERIOD_MIN;
+ }
+
+ void TNodeTrackerActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const NActors::TActorContext& ctx) {
+ const auto& describeResult = *ev->Get()->Result;
+ const auto& pathDescription = describeResult.GetPathDescription();
+ LOG_SQS_INFO(GetLogPrefix() << "got actual description for ["
+ << ev->Get()->PathId << " / " << describeResult.GetPath() << "] partitions=" << pathDescription.GetTablePartitions().size());
+
+ bool isFifo = ev->Get()->Key == FIFO_PATH_SUBSCRIPTION_KEY;
+ if (isFifo) {
+ UpdateKeyRanges(TabletsPerEndKeyRangeFIFO, pathDescription, ctx);
+ } else {
+ UpdateKeyRanges(TabletsPerEndKeyRangeSTD, pathDescription, ctx);
+ }
+
+ auto it = SubscriptionsAwaitingPartitionsUpdates.begin();
+ while (it != SubscriptionsAwaitingPartitionsUpdates.end()) {
+ if (SubscriberMustWait(*it->second)) {
+ ++it;
+ } else {
+ AddSubscriber(it->first, std::move(it->second), ctx);
+ auto itToRemove = it;
+ ++it;
+ SubscriptionsAwaitingPartitionsUpdates.erase(itToRemove);
+ }
+ }
+ }
+
+} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/node_tracker.h b/ydb/core/ymq/actor/node_tracker.h new file mode 100644 index 00000000000..aa4dd941615 --- /dev/null +++ b/ydb/core/ymq/actor/node_tracker.h @@ -0,0 +1,121 @@ +#pragma once + +#include "events.h" +#include "log.h" + +#include <ydb/core/base/tablet_pipe.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NSQS { + +class TNodeTrackerActor : public TActorBootstrapped<TNodeTrackerActor>{ +private: + static constexpr TDuration CLEANUP_UNUSED_TABLETS_PERIOD = TDuration::Minutes(10); + static constexpr TDuration DESCRIBE_TABLES_PERIOD_MIN = TDuration::Zero(); + static constexpr TDuration DESCRIBE_TABLES_PERIOD_MAX = TDuration::Seconds(5); + + using TKeyPrefix = std::tuple<ui64, ui64>; + +private: + struct TSubscriberInfo { + using TPtr = std::unique_ptr<TSubscriberInfo>; + + TSubscriberInfo( + ui64 queueIdNumber, + bool isFifo, + std::optional<ui64> specifiedLeaderTabletId, + std::optional<ui32> nodeId = std::nullopt + ); + + const ui64 QueueIdNumber; + const bool IsFifo; + const std::optional<ui64> SpecifiedLeaderTabletId; + + std::optional<ui32> NodeId; + }; + + struct TTabletInfo { + TActorId PipeClient; + TActorId PipeServer; + THashMap<ui64, TSubscriberInfo::TPtr> Subscribers; + }; + +public: + static const char* GetLogPrefix(); + + TNodeTrackerActor(NActors::TActorId schemeCacheActor); + void Bootstrap(const NActors::TActorContext& ctx); + + void WorkFunc(TAutoPtr<IEventHandle>& ev, const NActors::TActorContext& ctx) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvWakeup, HandleWakeup); + HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleCacheNavigateResponse); + HFunc(TSqsEvents::TEvNodeTrackerSubscribeRequest, AddSubscriber); + HFunc(TSqsEvents::TEvNodeTrackerUnsubscribeRequest, RemoveSubscriber); + HFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeClientDisconnected); + HFunc(TEvTabletPipe::TEvClientConnected, HandlePipeClientConnected); + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + default: + LOG_SQS_ERROR("Unknown type of event came to SQS node tracker actor: " << ev->Type << " (" << ev->GetBase()->ToString() << "), sender: " << ev->Sender); + } + } + + void HandleWakeup(TEvWakeup::TPtr&, const NActors::TActorContext& ctx); + void ScheduleDescribeTables(TDuration runAfter, const NActors::TActorContext& ctx); + void DescribeTablesFailed(const TString& error, const NActors::TActorContext& ctx); + + void HandlePipeClientConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx); + void HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext&); + void ClosePipeToTablet(TTabletInfo& info); + TTabletInfo& ConnectToTablet(ui64 tabletId, bool isReconnect = false); + TTabletInfo& ReconnectToTablet(ui64 tabletId); + TTabletInfo& GetTabletInfo(ui64 tabletId); + + + ui64 GetTabletId(const TMap<TKeyPrefix, ui64>& tabletsPerEndKeyRange, TKeyPrefix keyPrefix) const; + ui64 GetTabletId(const TSubscriberInfo& subscriber) const; + + void AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId); + void RemoveSubscriber(TSqsEvents::TEvNodeTrackerUnsubscribeRequest::TPtr& request, const NActors::TActorContext& ctx); + bool SubscriberMustWait(const TSubscriberInfo& subscriber) const; + void AddSubscriber(TSqsEvents::TEvNodeTrackerSubscribeRequest::TPtr& request, const NActors::TActorContext& ctx); + void AddSubscriber(ui64 subscriptionId, std::unique_ptr<TSubscriberInfo> subscriber, const NActors::TActorContext&); + void MoveSubscribersFromTablet(ui64 tabletId, const NActors::TActorContext& ctx); + void MoveSubscribersFromTablet(ui64 tabletId, TTabletInfo& info, const NActors::TActorContext& ctx); + void MoveSubscribersAfterKeyRangeChanged( + const TMap<TKeyPrefix, ui64>& current, + const TMap<TKeyPrefix, ui64>& actual, + const NActors::TActorContext& ctx + ); + + void UpdateKeyRanges( + TMap<TKeyPrefix, ui64>& currentTabletsPerEndKeyRange, + const NKikimrSchemeOp::TPathDescription& description, + const NActors::TActorContext& ctx + ); + + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const NActors::TActorContext& ctx); + +private: + TDuration DescribeTablesRetyPeriod = DESCRIBE_TABLES_PERIOD_MIN; + + TMap<TKeyPrefix, ui64> TabletsPerEndKeyRangeSTD; + TMap<TKeyPrefix, ui64> TabletsPerEndKeyRangeFIFO; + + THashMap<ui64, TTabletInfo> TabletsInfo; + THashMap<ui64, TSubscriberInfo::TPtr> SubscriptionsAwaitingPartitionsUpdates; + THashMap<ui64, ui64> TabletPerSubscriptionId; + THashMap<ui64, TInstant> LastAccessOfTabletWithoutSubscribers; + + TActorId ParentActor; + TActorId SchemeCacheActor; + + const TVector<TString> TablePathSTD; + const TVector<TString> TablePathFIFO; + + const TDuration UpdatePeriod; +}; + +} // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/queues_list_reader.cpp b/ydb/core/ymq/actor/queues_list_reader.cpp index 2cdbcfe2cea..09168e6d0d3 100644 --- a/ydb/core/ymq/actor/queues_list_reader.cpp +++ b/ydb/core/ymq/actor/queues_list_reader.cpp @@ -132,6 +132,7 @@ void TQueuesListReader::OnQueuesList(const TSqsEvents::TEvExecuted::TRecord& rec rec.ShardsCount = row["Shards"]; rec.DlqName = row["DlqName"]; rec.CreatedTimestamp = TInstant::MilliSeconds(ui64(row["CreatedTimestamp"])); + rec.IsFifo = row["FifoQueue"]; } const bool truncated = val["truncated"]; diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index c8fdfe0a915..ecec77eb174 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -12,6 +12,7 @@ #include "user_settings_names.h" #include "user_settings_reader.h" #include "index_events_processor.h" +#include "node_tracker.h" #include <ydb/public/lib/value/value.h> #include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h> @@ -55,10 +56,12 @@ using NKikimr::NClient::TValue; const TString LEADER_CREATE_REASON_USER_REQUEST = "UserRequestOnNode"; const TString LEADER_CREATE_REASON_LOCAL_TABLET = "LocalTablet"; const TString LEADER_DESTROY_REASON_LAST_REF = "LastReference"; -const TString LEADER_DESTROY_REASON_TABLET_PIPE_CLOSED = "TabletPipeClosed"; +const TString LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE = "LeaderTabletOnAnotherNode"; +const TString LEADER_DESTROY_REASON_REMOVE_INFO = "RemoveQueueInfo"; constexpr ui64 LIST_USERS_WAKEUP_TAG = 1; constexpr ui64 LIST_QUEUES_WAKEUP_TAG = 2; +constexpr ui64 CONNECT_TIMEOUT_TO_LEADER_WAKEUP_TAG = 3; constexpr size_t EARLY_REQUEST_USERS_LIST_MAX_BUDGET = 10; constexpr i64 EARLY_REQUEST_QUEUES_LIST_MAX_BUDGET = 5; // per user @@ -69,7 +72,7 @@ bool IsInternalFolder(const TString& folder) { struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { TQueueInfo( - TString userName, TString queueName, TString rootUrl, ui64 leaderTabletId, TString customName, + TString userName, TString queueName, TString rootUrl, ui64 leaderTabletId, bool isFifo, TString customName, TString folderId, ui32 tablesFormat, ui64 version, ui64 shardsCount, const TIntrusivePtr<TUserCounters>& userCounters, const TIntrusivePtr<TFolderCounters>& folderCounters, const TActorId& schemeCache, TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> quoterResourcesForUser, @@ -84,6 +87,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { , ShardsCount_(shardsCount) , RootUrl_(std::move(rootUrl)) , LeaderTabletId_(leaderTabletId) + , IsFifo_(isFifo) , Counters_(userCounters->CreateQueueCounters(QueueName_, FolderId_, insertCounters)) , UserCounters_(userCounters) , FolderCounters_(folderCounters) @@ -92,36 +96,19 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { { } - void ConnectToLeaderTablet(bool firstTime = true) { - if (ConnectingToLeaderTablet_) { - return; - } - ClosePipeToLeaderTablet(); - ConnectingToLeaderTablet_ = true; - NTabletPipe::TClientConfig cfg; - cfg.AllowFollower = false; - cfg.CheckAliveness = true; - cfg.RetryPolicy = {.RetryLimitCount = 3, .MinRetryTime = TDuration::MilliSeconds(100), .DoFirstRetryInstantly = firstTime}; - PipeClient_ = TActivationContext::Register(NTabletPipe::CreateClient(SelfId(), LeaderTabletId_, cfg)); - LOG_SQS_DEBUG("Connect to leader tablet [" << LeaderTabletId_ << "] for queue [" << UserName_ << "/" << QueueName_ << "]. Pipe client actor: " << PipeClient_); - } - - void SetLeaderPipeServer(const TActorId& pipeServer) { - LeaderPipeServer_ = pipeServer; - - const ui64 nodeId = LeaderPipeServer_.NodeId(); - if (nodeId == SelfId().NodeId()) { - IncLocalLeaderRef(LEADER_CREATE_REASON_LOCAL_TABLET); // ref for service - } + bool LeaderMustBeOnCurrentNode() const { + return LeaderNodeId_ && LeaderNodeId_.value() == SelfId().NodeId(); } - void ClosePipeToLeaderTablet() { - if (LeaderPipeServer_.NodeId() == SelfId().NodeId()) { - DecLocalLeaderRef(LEADER_DESTROY_REASON_TABLET_PIPE_CLOSED); // ref for service + void SetLeaderNodeId(ui32 nodeId) { + if (LeaderNodeId_ && LeaderNodeId_ == nodeId) { + return; } - if (PipeClient_) { - NTabletPipe::CloseClient(SelfId(), PipeClient_); - PipeClient_ = LeaderPipeServer_ = TActorId(); + LeaderNodeId_ = nodeId; + if (LeaderMustBeOnCurrentNode()) { + StartLocalLeader(LEADER_CREATE_REASON_LOCAL_TABLET); + } else { + StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE); } } @@ -140,6 +127,12 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { } } + void StopLocalLeaderIfNeeded(const TString& reason) { + if (!LeaderMustBeOnCurrentNode() && LocalLeaderRefCount_ == 0) { + StopLocalLeader(reason); + } + } + void StopLocalLeader(const TString& reason) { if (LocalLeader_) { Counters_ = Counters_->GetCountersForNotLeaderNode(); @@ -162,9 +155,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { void DecLocalLeaderRef(const TString& reason) { Y_VERIFY(LocalLeaderRefCount_ > 0); --LocalLeaderRefCount_; - if (LocalLeaderRefCount_ == 0) { - StopLocalLeader(reason); - } + StopLocalLeaderIfNeeded(reason); } TActorIdentity SelfId() const { @@ -180,20 +171,21 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { ui64 ShardsCount_; TString RootUrl_; ui64 LeaderTabletId_ = 0; + bool IsFifo_ = false; TIntrusivePtr<TQueueCounters> Counters_; TIntrusivePtr<TUserCounters> UserCounters_; TIntrusivePtr<TFolderCounters> FolderCounters_; - TActorId PipeClient_; - TActorId LeaderPipeServer_; + std::optional<ui32> LeaderNodeId_; + ui64 NodeTrackingSubscriptionId = 0; + TActorId LocalLeader_; TActorId SchemeCache_; ui64 LocalLeaderRefCount_ = 0; TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResourcesForUser_; // State machine - bool ConnectingToLeaderTablet_ = false; - TInstant DisconnectedFrom_ = TInstant::Now(); THashSet<TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_; + TInstant NodeUnknownSince_ = TInstant::Now(); }; struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> { @@ -300,6 +292,7 @@ void TSqsService::Bootstrap() { AggregatedUserCounters_->ShowDetailedCounters(TInstant::Max()); InitSchemeCache(); + NodeTrackerActor_ = Register(new TNodeTrackerActor(SchemeCache_)); Register(new TUserSettingsReader(AggregatedUserCounters_->GetTransactionCounters())); QueuesListReader_ = Register(new TQueuesListReader(AggregatedUserCounters_->GetTransactionCounters())); @@ -341,8 +334,7 @@ STATEFN(TSqsService::StateFunc) { hFunc(TEvWakeup, HandleWakeup); hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleDescribeSchemeResult); hFunc(TSqsEvents::TEvExecuted, HandleExecuted); - hFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeClientDisconnected); - hFunc(TEvTabletPipe::TEvClientConnected, HandlePipeClientConnected); + hFunc(TSqsEvents::TEvNodeTrackerSubscriptionStatus, HandleNodeTrackingSubscriptionStatus); hFunc(TSqsEvents::TEvGetConfiguration, HandleGetConfiguration); hFunc(TSqsEvents::TEvSqsRequest, HandleSqsRequest); hFunc(TSqsEvents::TEvInsertQueueCounters, HandleInsertQueueCounters); @@ -436,15 +428,23 @@ void TSqsService::HandleGetLeaderNodeForQueueRequest(TSqsEvents::TEvGetLeaderNod return; } - if (!queueIt->second->LeaderPipeServer_) { + auto queuePtr = queueIt->second; + if (!queuePtr->LeaderNodeId_) { LWPROBE(QueueRequestCacheMiss, userName, queueName, reqId, ev->Get()->ToStringHeader()); RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] is waiting for connection to leader tablet."); - auto& queue = queueIt->second; - queue->GetLeaderNodeRequests_.emplace(std::move(ev)); + + queuePtr->GetLeaderNodeRequests_.emplace(std::move(ev)); + if (QueuesWithGetNodeWaitingRequests.empty()) { + Schedule( + TDuration::MilliSeconds(Cfg().GetLeaderConnectTimeoutMs()), + new TEvWakeup(CONNECT_TIMEOUT_TO_LEADER_WAKEUP_TAG) + ); + } + QueuesWithGetNodeWaitingRequests.insert(queuePtr); return; } - const ui64 nodeId = queueIt->second->LeaderPipeServer_.NodeId(); + const ui32 nodeId = queuePtr->LeaderNodeId_.value(); RLOG_SQS_REQ_DEBUG(reqId, "Leader node for queue [" << userName << "/" << queueName << "] is " << nodeId); Send(ev->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(reqId, userName, queueName, nodeId)); } @@ -737,50 +737,28 @@ TSqsService::TUserInfoPtr TSqsService::GetUserOrWait(TAutoPtr<TEvent>& ev) { return userIt->second; } -void TSqsService::HandlePipeClientConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev) { - auto queueIt = LeaderTabletIdToQueue_.find(ev->Get()->TabletId); - if (queueIt == LeaderTabletIdToQueue_.end()) { - LOG_SQS_WARN("Connected to unknown queue leader. Tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId); - return; - } - const auto& queue = queueIt->second; - queue->ConnectingToLeaderTablet_ = false; - - if (ev->Get()->Status != NKikimrProto::OK) { - LOG_SQS_WARN("Failed to connect to queue [" << queue->UserName_ << "/" << queue->QueueName_ << "] leader tablet. Tablet id: [" << ev->Get()->TabletId << "]. Status: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status)); - const TInstant now = TActivationContext::Now(); - const TDuration timeDisconnecned = now - queue->DisconnectedFrom_; - const TDuration leaderConnectTimeout = TDuration::MilliSeconds(Cfg().GetLeaderConnectTimeoutMs()); - if (timeDisconnecned >= leaderConnectTimeout) { - for (auto& req : queue->GetLeaderNodeRequests_) { - RLOG_SQS_REQ_WARN(req->Get()->RequestId, "Can't connect to leader tablet for " << timeDisconnecned); - Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::FailedToConnectToLeader)); - } - queue->GetLeaderNodeRequests_.clear(); - } - queue->ConnectToLeaderTablet(false); +void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev) { + ui64 subscriptionId = ev->Get()->SubscriptionId; + auto it = QueuePerNodeTrackingSubscription.find(subscriptionId); + if (it == QueuePerNodeTrackingSubscription.end()) { + LOG_SQS_WARN("Get node tracking status for unknown subscription id: " << subscriptionId); + Send(NodeTrackerActor_, new TSqsEvents::TEvNodeTrackerUnsubscribeRequest(subscriptionId)); return; } - - LOG_SQS_DEBUG("Connected to queue [" << queueIt->second->UserName_ << "/" << queueIt->second->QueueName_ << "] leader. Tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId); - queue->SetLeaderPipeServer(ev->Get()->ServerId); - for (auto& req : queue->GetLeaderNodeRequests_) { - RLOG_SQS_REQ_DEBUG(req->Get()->RequestId, "Connected to leader tablet. Node id: " << queue->LeaderPipeServer_.NodeId()); - Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, queue->LeaderPipeServer_.NodeId())); - } - queue->GetLeaderNodeRequests_.clear(); -} - -void TSqsService::HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { - auto queueIt = LeaderTabletIdToQueue_.find(ev->Get()->TabletId); - if (queueIt != LeaderTabletIdToQueue_.end()) { - queueIt->second->ConnectingToLeaderTablet_ = false; - queueIt->second->DisconnectedFrom_ = TActivationContext::Now(); - LOG_SQS_DEBUG("Disconnected from queue [" << queueIt->second->UserName_ << "/" << queueIt->second->QueueName_ << "] leader. Tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId); - queueIt->second->ConnectToLeaderTablet(false); - } else { - LOG_SQS_WARN("Disconnected from unknown queue leader. Tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId); + auto queuePtr = it->second; + auto& queue = *queuePtr; + auto nodeId = ev->Get()->NodeId; + queue.SetLeaderNodeId(nodeId); + LOG_SQS_DEBUG( + "Got node leader for queue [" << queue.UserName_ << "/" << queue.QueueName_ + << "]. Node: " << nodeId << " subscription id: " << subscriptionId + ); + for (auto& req : queue.GetLeaderNodeRequests_) { + RLOG_SQS_REQ_DEBUG(req->Get()->RequestId, "Got node leader. Node id: " << nodeId); + Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, nodeId)); } + queue.GetLeaderNodeRequests_.clear(); + QueuesWithGetNodeWaitingRequests.erase(queuePtr); } void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) { @@ -798,9 +776,9 @@ void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) { auto oldListIt = user->Queues_.begin(); while (oldListIt != user->Queues_.end() && newListIt != ev->Get()->SortedQueues.end() && newListIt->UserName == user->UserName_) { if (oldListIt->first == newListIt->QueueName) { // the same queue - if (oldListIt->second->LeaderTabletId_ != newListIt->LeaderTabletId) { - LOG_SQS_WARN("Leader tablet id for queue " << oldListIt->first << " has been changed from " - << oldListIt->second->LeaderTabletId_ << " to " << newListIt->LeaderTabletId << " (queue was recreated)"); + if (oldListIt->second->Version_ != newListIt->Version) { + LOG_SQS_WARN("Queue version for queue " << oldListIt->first << " has been changed from " + << oldListIt->second->Version_ << " to " << newListIt->Version << " (queue was recreated)"); THashSet<TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> oldQueueRequests; oldQueueRequests.swap(oldListIt->second->GetLeaderNodeRequests_); @@ -813,9 +791,12 @@ void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) { newListIt->TablesFormat, newListIt->Version, newListIt->ShardsCount, - newListIt->CreatedTimestamp); - Y_VERIFY(oldListIt->second->ConnectingToLeaderTablet_); + newListIt->CreatedTimestamp, + newListIt->IsFifo); oldQueueRequests.swap(oldListIt->second->GetLeaderNodeRequests_); + if (!oldListIt->second->GetLeaderNodeRequests_.empty()) { + QueuesWithGetNodeWaitingRequests.insert(oldListIt->second); + } } ++oldListIt; ++newListIt; @@ -832,7 +813,8 @@ void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) { newListIt->TablesFormat, newListIt->Version, newListIt->ShardsCount, - newListIt->CreatedTimestamp); + newListIt->CreatedTimestamp, + newListIt->IsFifo); ++oldListIt; ++newListIt; } @@ -851,7 +833,8 @@ void TSqsService::HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev) { newListIt->TablesFormat, newListIt->Version, newListIt->ShardsCount, - newListIt->CreatedTimestamp); + newListIt->CreatedTimestamp, + newListIt->IsFifo); ++newListIt; } @@ -1018,13 +1001,12 @@ void TSqsService::RemoveQueue(const TString& userName, const TString& queue) { } auto queuePtr = queueIt->second; - queuePtr->ClosePipeToLeaderTablet(); + CancleNodeTrackingSubscription(queuePtr); for (auto& req : queuePtr->GetLeaderNodeRequests_) { RLOG_SQS_REQ_DEBUG(req->Get()->RequestId, "Removing queue [" << req->Get()->UserName << "/" << req->Get()->QueueName << "] from sqs service info"); Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::NoQueue)); } queuePtr->GetLeaderNodeRequests_.clear(); - LeaderTabletIdToQueue_.erase(queuePtr->LeaderTabletId_); userIt->second->QueueByNameAndFolder_.erase(std::make_pair(queuePtr->CustomName_, queuePtr->FolderId_)); auto queuesCount = userIt->second->CountQueuesInFolder(queuePtr->FolderId_); if (!queuesCount) { @@ -1042,12 +1024,12 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co const ui32 tablesFormat, const ui64 version, const ui64 shardsCount, - const TInstant createdTimestamp) { + const TInstant createdTimestamp, + bool isFifo) { auto user = MutableUser(userName, false); // don't move requests because they are already moved in our caller const TInstant now = TActivationContext::Now(); const TInstant timeToInsertCounters = createdTimestamp + TDuration::MilliSeconds(Cfg().GetQueueCountersExportDelayMs()); const bool insertCounters = now >= timeToInsertCounters; - auto folderCntrIter = user->FolderCounters_.find(folderId); if (folderCntrIter == user->FolderCounters_.end()) { folderCntrIter = user->FolderCounters_.insert(std::make_pair(folderId, user->Counters_->CreateFolderCounters(folderId, true))).first; @@ -1057,12 +1039,11 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co } auto ret = user->Queues_.insert(std::make_pair(queue, TQueueInfoPtr(new TQueueInfo( - userName, queue, RootUrl_, leaderTabletId, customName, folderId, tablesFormat, version, shardsCount, + userName, queue, RootUrl_, leaderTabletId, isFifo, customName, folderId, tablesFormat, version, shardsCount, user->Counters_, folderCntrIter->second, SchemeCache_, user->QuoterResources_, insertCounters))) ).first; auto queueInfo = ret->second; - LeaderTabletIdToQueue_[leaderTabletId] = queueInfo; user->QueueByNameAndFolder_.emplace(std::make_pair(customName, folderId), queueInfo); { @@ -1109,12 +1090,54 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co } user->GetQueueFolderIdAndCustomNameRequests_.erase(requests.first, requests.second); } - - queueInfo->ConnectToLeaderTablet(); - LOG_SQS_DEBUG("Created queue record. Queue: [" << queue << "]. Leader tablet id: [" << leaderTabletId << "]. Pipe client actor: " << queueInfo->PipeClient_); + + CreateNodeTrackingSubscription(queueInfo); + LOG_SQS_DEBUG("Created queue record. Queue: [" << queue << "]. QueueIdNumber: " << queueInfo->Version_ << ". Leader tablet id: [" << leaderTabletId << "]. Node tracker subscription: " << queueInfo->NodeTrackingSubscriptionId); return ret; } +void TSqsService::CreateNodeTrackingSubscription(TQueueInfoPtr queueInfo) { + Y_VERIFY(!queueInfo->NodeTrackingSubscriptionId); + queueInfo->NodeTrackingSubscriptionId = ++MaxNodeTrackingSubscriptionId; + LOG_SQS_DEBUG("Create node tracking subscription queue_id_number=" << queueInfo->Version_ + << " tables_format=" << queueInfo->TablesFormat_ << " subscription_id=" << queueInfo->NodeTrackingSubscriptionId + ); + + QueuePerNodeTrackingSubscription[queueInfo->NodeTrackingSubscriptionId] = queueInfo; + + std::optional<ui64> fixedLeaderTabletId; + if (queueInfo->TablesFormat_ == 0) { + fixedLeaderTabletId = queueInfo->LeaderTabletId_; + } + Send( + NodeTrackerActor_, + new TSqsEvents::TEvNodeTrackerSubscribeRequest( + queueInfo->NodeTrackingSubscriptionId, + queueInfo->Version_, + queueInfo->IsFifo_, + fixedLeaderTabletId + ) + ); +} + +void TSqsService::CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo) { + LOG_SQS_DEBUG("Cancle node tracking subscription queue_id_number=" << queueInfo->Version_ + << " tables_format=" << queueInfo->TablesFormat_ << " subscription_id=" << queueInfo->NodeTrackingSubscriptionId + ); + Y_VERIFY(queueInfo->NodeTrackingSubscriptionId); + auto id = queueInfo->NodeTrackingSubscriptionId; + queueInfo->NodeTrackingSubscriptionId = 0; + + QueuePerNodeTrackingSubscription.erase(id); + queueInfo->LeaderNodeId_.reset(); + queueInfo->StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_REMOVE_INFO); + + Send( + NodeTrackerActor_, + new TSqsEvents::TEvNodeTrackerUnsubscribeRequest(id) + ); +} + void TSqsService::AnswerNoUserToRequests() { AnswerNoUserToRequests(GetLeaderNodeRequests_); AnswerNoUserToRequests(GetConfigurationRequests_); @@ -1146,6 +1169,32 @@ void TSqsService::AnswerErrorToRequests(const TUserInfoPtr& user) { AnswerErrorToRequests(user, user->CountQueuesRequests_); } +void TSqsService::ProcessConnectTimeoutToLeader() { + TDuration nextRunAfter = TDuration::Max(); + TDuration leaderConnectTimeout = TDuration::MilliSeconds(Cfg().GetLeaderConnectTimeoutMs()); + auto it = QueuesWithGetNodeWaitingRequests.begin(); + while(it != QueuesWithGetNodeWaitingRequests.end()) { + auto& queue = **it; + auto nodeUnknownTime = TActivationContext::Now() - queue.NodeUnknownSince_; + auto timeLeft = leaderConnectTimeout - nodeUnknownTime; + if (timeLeft == TDuration::Zero()) { + for (auto& req : queue.GetLeaderNodeRequests_) { + RLOG_SQS_REQ_WARN(req->Get()->RequestId, "Can't connect to leader tablet for " << nodeUnknownTime); + Send(req->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(req->Get()->RequestId, req->Get()->UserName, req->Get()->QueueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::FailedToConnectToLeader)); + } + queue.GetLeaderNodeRequests_.clear(); + auto toRemoveIt = it++; + QueuesWithGetNodeWaitingRequests.erase(toRemoveIt); + } else { + nextRunAfter = Min(nextRunAfter, timeLeft); + ++it; + } + } + if (nextRunAfter != TDuration::Max()) { + Schedule(nextRunAfter, new TEvWakeup(CONNECT_TIMEOUT_TO_LEADER_WAKEUP_TAG)); + } +} + void TSqsService::HandleWakeup(TEvWakeup::TPtr& ev) { Y_VERIFY(ev->Get()->Tag != 0); switch (ev->Get()->Tag) { @@ -1167,6 +1216,9 @@ void TSqsService::HandleWakeup(TEvWakeup::TPtr& ev) { RequestSqsQueuesList(); } break; + case CONNECT_TIMEOUT_TO_LEADER_WAKEUP_TAG: + ProcessConnectTimeoutToLeader(); + break; } } diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h index 32588a07508..666db5f4be7 100644 --- a/ydb/core/ymq/actor/service.h +++ b/ydb/core/ymq/actor/service.h @@ -59,7 +59,11 @@ private: void HandleInsertQueueCounters(TSqsEvents::TEvInsertQueueCounters::TPtr& ev); void HandleUserSettingsChanged(TSqsEvents::TEvUserSettingsChanged::TPtr& ev); void HandleQueuesList(TSqsEvents::TEvQueuesList::TPtr& ev); + void HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTrackerSubscriptionStatus::TPtr& ev); + void CreateNodeTrackingSubscription(TQueueInfoPtr queueInfo); + void CancleNodeTrackingSubscription(TQueueInfoPtr queueInfo); + void ProcessConnectTimeoutToLeader(); void ScheduleRequestSqsUsersList(); void RequestSqsUsersList(); @@ -73,7 +77,7 @@ private: void RemoveUser(const TString& userName); std::map<TString, TQueueInfoPtr>::iterator AddQueue(const TString& userName, const TString& queue, ui64 leaderTabletId, const TString& customName, const TString& folderId, const ui32 tablesFormat, const ui64 version, - const ui64 shardsCount, const TInstant createdTimestamp); + const ui64 shardsCount, const TInstant createdTimestamp, bool isFifo); void AnswerNoUserToRequests(); void AnswerNoQueueToRequests(const TUserInfoPtr& user); @@ -138,7 +142,6 @@ private: std::shared_ptr<TAlignedPagePoolCounters> AllocPoolCounters_; TIntrusivePtr<TUserCounters> AggregatedUserCounters_; TUsersMap Users_; - THashMap<ui64, TQueueInfoPtr> LeaderTabletIdToQueue_; THashMap<TActorId, TQueueInfoPtr> LocalLeaderRefs_; // referer -> queue info TActorId SchemeCache_; TActorId QueuesListReader_; @@ -159,6 +162,11 @@ private: THashMultiMap<TString, TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr> GetQueueFolderIdAndCustomNameRequests_; // user name -> request THashMultiMap<TString, TSqsEvents::TEvCountQueues::TPtr> CountQueuesRequests_; // user name -> request + TActorId NodeTrackerActor_; + THashMap<ui64, TQueueInfoPtr> QueuePerNodeTrackingSubscription; + ui64 MaxNodeTrackingSubscriptionId = 0; + + THashSet<TQueueInfoPtr> QueuesWithGetNodeWaitingRequests; struct TYcSearchEventsConfig { TString Database; diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp index 73a7fa71cf2..1df5c5a87e7 100644 --- a/ydb/core/ymq/queues/std/queries.cpp +++ b/ydb/core/ymq/queues/std/queries.cpp @@ -1307,6 +1307,7 @@ const char* const GetQueuesListQuery = R"__( 'Account 'QueueName 'QueueState + 'FifoQueue 'CreatedTimestamp 'CustomQueueName 'DlqName diff --git a/ydb/tests/functional/sqs/common/test_queues_managing.py b/ydb/tests/functional/sqs/common/test_queues_managing.py index 75d265ed0eb..3f54b000184 100644 --- a/ydb/tests/functional/sqs/common/test_queues_managing.py +++ b/ydb/tests/functional/sqs/common/test_queues_managing.py @@ -260,7 +260,7 @@ class QueuesManagingTest(KikimrSqsTestBase): master_is_updated = True break except RuntimeError as ex: - assert str(ex).find('master session error') != -1 + assert str(ex).find('master session error') != -1 or str(ex).find('failed because of an unknown error, exception or failure') != -1 time.sleep(0.5) # wait master update time assert_that(master_is_updated) diff --git a/ydb/tests/functional/sqs/merge_split_common_table/__init__.py b/ydb/tests/functional/sqs/merge_split_common_table/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ydb/tests/functional/sqs/merge_split_common_table/__init__.py diff --git a/ydb/tests/functional/sqs/merge_split_common_table/fifo/test.py b/ydb/tests/functional/sqs/merge_split_common_table/fifo/test.py new file mode 100644 index 00000000000..a1a9d72039b --- /dev/null +++ b/ydb/tests/functional/sqs/merge_split_common_table/fifo/test.py @@ -0,0 +1,6 @@ +from ydb.tests.functional.sqs.merge_split_common_table.test import TestSqsSplitMergeTables
+
+
+class TestSqsSplitMergeFifoTables(TestSqsSplitMergeTables):
+ def test_fifo_merge_split(self):
+ self.run_test(is_fifo=True)
diff --git a/ydb/tests/functional/sqs/merge_split_common_table/std/test.py b/ydb/tests/functional/sqs/merge_split_common_table/std/test.py new file mode 100644 index 00000000000..3bbedf0df96 --- /dev/null +++ b/ydb/tests/functional/sqs/merge_split_common_table/std/test.py @@ -0,0 +1,6 @@ +from ydb.tests.functional.sqs.merge_split_common_table.test import TestSqsSplitMergeTables
+
+
+class TestSqsSplitMergeStdTables(TestSqsSplitMergeTables):
+ def test_std_merge_split(self):
+ self.run_test(is_fifo=False)
diff --git a/ydb/tests/functional/sqs/merge_split_common_table/test.py b/ydb/tests/functional/sqs/merge_split_common_table/test.py new file mode 100644 index 00000000000..fd7dd064780 --- /dev/null +++ b/ydb/tests/functional/sqs/merge_split_common_table/test.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import time
+
+import ydb
+import random
+import string
+
+from ydb.tests.library.common.types import Erasure
+from ydb.tests.library.sqs.test_base import KikimrSqsTestBase
+
+
+def random_string(length):
+ return ''.join([random.choice(string.ascii_lowercase) for _ in range(length)])
+
+
+class TestSqsSplitMergeTables(KikimrSqsTestBase):
+ erasure = Erasure.BLOCK_4_2
+ use_in_memory_pdisks = False
+
+ @classmethod
+ def _setup_config_generator(cls):
+ config_generator = super(TestSqsSplitMergeTables, cls)._setup_config_generator()
+ config_generator.yaml_config['sqs_config']['masters_describer_update_time_ms'] = 1000
+ config_generator.yaml_config['sqs_config']['background_metrics_update_time_ms'] = 1000
+ return config_generator
+
+ def get_leaders_per_nodes(self):
+ nodes = len(self.cluster.nodes)
+ leaders = []
+ for node_index in range(nodes):
+ counters = self._get_counters(node_index, 'utils', counters_format='json')
+ labels = {
+ 'activity': 'SQS_QUEUE_LEADER_ACTOR',
+ 'sensor': 'ActorsAliveByActivity'
+ }
+ leader_actors = self._get_counter(counters, labels)
+ leaders.append(leader_actors['value'] if leader_actors else 0)
+ return leaders
+
+ def wait_leader_started(self, queues_count):
+ while True:
+ leaders_per_node = self.get_leaders_per_nodes()
+ logging.debug(f'wait all leaders {sum(leaders_per_node)} vs {queues_count} : {leaders_per_node}')
+ if sum(leaders_per_node) == queues_count:
+ return
+ time.sleep(1)
+
+ def alter_table(self, table_path):
+ logging.info(f'alter table {table_path}...')
+ settings = ydb.RetrySettings()
+ settings.max_retries = 1
+ session = ydb.retry_operation_sync(lambda: self._driver.table_client.session().create(), retry_settings=settings)
+
+ self.__column_to_force_split = 'column_for_tests'
+ ydb.retry_operation_sync(lambda: session.alter_table(
+ table_path,
+ alter_partitioning_settings=ydb.PartitioningSettings()
+ .with_min_partitions_count(1)
+ .with_partition_size_mb(1)
+ .with_partitioning_by_size(ydb.FeatureFlag.ENABLED)
+ .with_partitioning_by_load(ydb.FeatureFlag.ENABLED),
+ add_columns=(
+ ydb.Column(
+ self.__column_to_force_split,
+ ydb.OptionalType(ydb.PrimitiveType.String),
+ ),
+ ),
+ ))
+
+ def force_split(self, table_path):
+ logging.info(f'force split {table_path}...')
+ settings = ydb.RetrySettings()
+ settings.max_retries = 1
+ session = ydb.retry_operation_sync(lambda: self._driver.table_client.session().create(), retry_settings=settings)
+ session.transaction().execute(f'update `{table_path}` SET {self.__column_to_force_split}="{random_string(5*1024*1024)}"', commit_tx=True)
+
+ def get_nodes_with_leaders(self):
+ leaders_per_node = self.get_leaders_per_nodes()
+ return len(list(filter(bool, leaders_per_node)))
+
+ def send_messages(self, is_fifo, queue_urls, messages_count=1, message_length=16):
+ group_id = 'group' if is_fifo else None
+ for i in range(messages_count):
+ for queue_url in queue_urls:
+ self.seq_no += 1
+ self._send_message_and_assert(queue_url, random_string(message_length), seq_no=self.seq_no if is_fifo else None, group_id=group_id)
+
+ def run_test(self, is_fifo):
+ self._init_with_params(is_fifo, tables_format=1)
+ queues_count = 25
+ queue_urls = []
+
+ for index in range(queues_count):
+ queue_name = f'q_{index}_{self.queue_name}'
+ queue_urls.append(self._create_queue_and_assert(queue_name, is_fifo=is_fifo))
+ self.wait_leader_started(queues_count)
+ logging.info('all leaders started.')
+ assert self.get_nodes_with_leaders() > 1
+
+ self.send_messages(is_fifo, queue_urls)
+ logging.info('messages have been sent #1')
+
+ balancing_table_path = '/Root/SQS/.' + ('FIFO' if is_fifo else 'STD') + '/Messages'
+ self.alter_table(balancing_table_path)
+
+ self.send_messages(is_fifo, queue_urls)
+ logging.info('messages have been sent #2')
+
+ while True:
+ time.sleep(1)
+ leaders_per_node = self.get_leaders_per_nodes()
+ nodes_with_leaders = len(list(filter(bool, leaders_per_node)))
+ logging.debug(f'wait merge... nodes_with_leaders={nodes_with_leaders} all_leaders={sum(leaders_per_node)} : {leaders_per_node}')
+ if nodes_with_leaders == 1 and sum(leaders_per_node) == queues_count:
+ break
+
+ logging.info('all leaders on 1 node')
+
+ self.force_split(balancing_table_path)
+ self.send_messages(is_fifo, queue_urls, messages_count=2)
+ logging.info('messages have been sent #3')
+
+ while True:
+ time.sleep(1)
+ leaders_per_node = self.get_leaders_per_nodes()
+ nodes_with_leaders = len(list(filter(bool, leaders_per_node)))
+ logging.info(f'wait split... nodes_with_leaders={nodes_with_leaders} all_leaders={sum(leaders_per_node)} : {leaders_per_node}')
+ if nodes_with_leaders > 1 and sum(leaders_per_node) == queues_count:
+ break
+ logging.info(f'test finished. Leaders pre node : {leaders_per_node}')
diff --git a/ydb/tests/library/sqs/tables.py b/ydb/tests/library/sqs/tables.py index 0381e810fab..e212e534881 100644 --- a/ydb/tests/library/sqs/tables.py +++ b/ydb/tests/library/sqs/tables.py @@ -24,21 +24,28 @@ def _create_table(root, session, table_name, columns, keys_count, queue_type=Non table_path = get_table_path(root, table_name, queue_type) keys = [name for name, _ in columns[:keys_count]] columns = [ydb.Column(name, ydb.OptionalType(column_type)) for name, column_type in columns] - ydb.retry_operation_sync(lambda: session.create_table( - table_path, - ydb.TableDescription() - .with_primary_keys(*keys) - .with_columns(*columns) - .with_profile( - ydb.TableProfile() - .with_partitioning_policy( - ydb.PartitioningPolicy() - .with_auto_partitioning( - ydb.AutoPartitioningPolicy.AUTO_SPLIT - ) + + if queue_type: + ydb.retry_operation_sync(lambda: session.create_table( + table_path, + ydb.TableDescription() + .with_primary_keys(*keys) + .with_columns(*columns) + .with_uniform_partitions(10) + .with_partitioning_settings( + ydb.PartitioningSettings() + .with_min_partitions_count(10) + .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) + .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) ) - ) - )) + )) + else: + ydb.retry_operation_sync(lambda: session.create_table( + table_path, + ydb.TableDescription() + .with_primary_keys(*keys) + .with_columns(*columns) + )) def get_table_keys_for_queue(with_shard=False): diff --git a/ydb/tests/library/sqs/test_base.py b/ydb/tests/library/sqs/test_base.py index ba292399e58..9f78bde4828 100644 --- a/ydb/tests/library/sqs/test_base.py +++ b/ydb/tests/library/sqs/test_base.py @@ -259,7 +259,7 @@ class KikimrSqsTestBase(object): config_generator = KikimrConfigGenerator( erasure=cls.erasure, use_in_memory_pdisks=cls.use_in_memory_pdisks, - additional_log_configs={'SQS': LogLevels.INFO}, + additional_log_configs={'SQS': LogLevels.DEBUG}, enable_sqs=True, ) config_generator.yaml_config['sqs_config']['root'] = cls.sqs_root |