From dec300ed4d56c60d5d2948a2cd3b6f5678444279 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 15 Apr 2024 19:01:36 +0500 Subject: Rebalance partition for read to other session for old SDK (#3734) --- ydb/core/persqueue/events/internal.h | 4 +- ydb/core/persqueue/partition.cpp | 2 + ydb/core/persqueue/partition.h | 2 +- ydb/core/persqueue/partition_read.cpp | 2 +- ydb/core/persqueue/read_balancer.cpp | 55 +++++++++++++++++++--- ydb/core/persqueue/read_balancer.h | 17 +++++-- ydb/core/persqueue/read_balancer__types.cpp | 21 +++++++-- ydb/core/persqueue/ut/autoscaling_ut.cpp | 4 ++ .../persqueue/ut/common/autoscaling_ut_common.cpp | 30 ++++++------ ydb/core/protos/pqconfig.proto | 5 ++ 10 files changed, 110 insertions(+), 32 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 44b5cf31e0a..58a11679b4a 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1090,9 +1090,11 @@ struct TEvPQ { struct TEvReadingPartitionStatusRequest : public TEventPB { TEvReadingPartitionStatusRequest() = default; - TEvReadingPartitionStatusRequest(const TString& consumer, ui32 partitionId) { + TEvReadingPartitionStatusRequest(const TString& consumer, ui32 partitionId, ui32 generaion, ui64 cookie) { Record.SetConsumer(consumer); Record.SetPartitionId(partitionId); + Record.SetGeneration(generaion); + Record.SetCookie(cookie); } }; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 76829604b4c..0bf4f565f89 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -631,6 +631,8 @@ void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContex void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) { NKikimrPQ::TStatusResponse::TPartResult result; result.SetPartition(Partition.InternalPartitionId); + result.SetGeneration(TabletGeneration); + result.SetCookie(++PQRBCookie); if (DiskIsFull || WaitingForSubDomainQuota(ctx)) { result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_DISK_IS_FULL); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index c8840e87900..613dc00147e 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -679,6 +679,7 @@ private: TDuration InitDuration; bool InitDone; bool NewPartition; + ui64 PQRBCookie = 0; THashMap Owners; THashSet OwnerPipes; @@ -781,4 +782,3 @@ private: }; } // namespace NKikimr::NPQ - diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 3ff237ae45c..a6f7f80a959 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -30,7 +30,7 @@ namespace NKikimr::NPQ { static const ui32 MAX_USER_ACTS = 1000; void TPartition::SendReadingFinished(const TString& consumer) { - Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId)); + Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId, TabletGeneration, ++PQRBCookie)); } void TPartition::FillReadFromTimestamps(const TActorContext& ctx) { diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 1090f5651d5..aaeb5ac1968 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -718,12 +718,14 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c for (const auto& partRes : record.GetPartResult()) { auto partitionId = partRes.GetPartition(); + auto generation = partRes.GetGeneration(); + auto cookie = partRes.GetCookie(); for (const auto& consumer : partRes.GetConsumerResult()) { if (consumer.GetReadingFinished()) { auto it = ClientsInfo.find(consumer.GetConsumer()); if (it != ClientsInfo.end()) { auto& clientInfo = it->second; - if (clientInfo.IsReadeable(partitionId) && clientInfo.SetCommittedState(partitionId)) { + if (clientInfo.IsReadeable(partitionId) && clientInfo.SetCommittedState(partitionId, generation, cookie)) { clientInfo.ProccessReadingFinished(partRes.GetPartition(), ctx); } } @@ -1127,8 +1129,8 @@ bool TPersQueueReadBalancer::TClientInfo::IsFinished(ui32 partitionId) const { return it->second.IsFinished(); } -bool TPersQueueReadBalancer::TClientInfo::SetCommittedState(ui32 partitionId) { - return ReadingPartitionStatus[partitionId].SetCommittedState(); +bool TPersQueueReadBalancer::TClientInfo::SetCommittedState(ui32 partitionId, ui32 generation, ui64 cookie) { + return ReadingPartitionStatus[partitionId].SetCommittedState(generation, cookie); } TPersQueueReadBalancer::TClientGroupInfo* TPersQueueReadBalancer::TClientInfo::FindGroup(ui32 partitionId) { @@ -1671,8 +1673,11 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockMissingPartitions( const TActorContext& ctx) { std::deque freePartitions = std::move(FreePartitions); + std::deque toOtherPartitions; for (auto& [sessionKey, sessionInfo] : SessionsInfo) { + auto& pipe = sessionKey.first; + ui32 realDesired = (allowPlusOne > 0) ? desired + 1 : desired; if (allowPlusOne > 0) { --allowPlusOne; @@ -1687,8 +1692,13 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockMissingPartitions( while (req > 0 && !freePartitions.empty()) { auto partitionId = freePartitions.front(); if (partitionPredicate(partitionId)) { - --req; - LockPartition(sessionKey.first, sessionInfo, partitionId, ctx); + auto& status = ClientInfo.GetPartitionReadingStatus(partitionId); + if (status.BalanceToOtherPipe() && status.LastPipe != pipe || SessionsInfo.size() == 1) { + --req; + LockPartition(pipe, sessionInfo, partitionId, ctx); + } else { + toOtherPartitions.push_back(partitionId); + } } else { FreePartitions.push_back(partitionId); } @@ -1700,6 +1710,38 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockMissingPartitions( } } + if (!toOtherPartitions.empty()) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, + GetPrefix() << "client: "<< ClientId << " balance group " << Group << " partitions " << JoinRange(", ", toOtherPartitions.begin(), toOtherPartitions.end()) << " to other sessions"); + + for (auto& [sessionKey, sessionInfo] : SessionsInfo) { + auto& pipe = sessionKey.first; + ui32 realDesired = desired + 1; + + ssize_t actual = actualExtractor(sessionInfo); + if (actual >= realDesired) { + continue; + } + + ssize_t req = ((ssize_t)realDesired) - actual; + size_t possibleIterations = toOtherPartitions.size(); + while (req > 0 && !toOtherPartitions.empty() && possibleIterations) { + auto partitionId = toOtherPartitions.front(); + toOtherPartitions.pop_front(); + + auto& status = ClientInfo.GetPartitionReadingStatus(partitionId); + if (status.LastPipe != pipe) { + --req; + --possibleIterations; + LockPartition(pipe, sessionInfo, partitionId, ctx); + } else { + --possibleIterations; + toOtherPartitions.push_back(partitionId); + } + } + } + } + FreePartitions.insert(FreePartitions.end(), freePartitions.begin(), freePartitions.end()); } @@ -1958,7 +2000,7 @@ void TPersQueueReadBalancer::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPt return; } - if (clientInfo.SetCommittedState(partitionId)) { + if (clientInfo.SetCommittedState(partitionId, r.GetGeneration(), r.GetCookie())) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "The offset of the partition " << partitionId << " was commited by " << r.GetConsumer()); @@ -2048,6 +2090,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvReadingPartitionFinishedReq << ". Scheduled release of the partition for re-reading. Delay=" << delay << " seconds," << " firstMessage=" << r.GetStartedReadingFromEndOffset() << ", " << GetSdkDebugString(r.GetScaleAwareSDK())); + status.LastPipe = ev->Sender; ctx.Schedule(TDuration::Seconds(delay), new TEvPQ::TEvWakeupReleasePartition(r.GetConsumer(), partitionId, status.Cookie)); } } diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index 640541f2d86..b3375c4c379 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -212,10 +212,17 @@ private: size_t Iteration = 0; ui64 Cookie = 0; - // Return true if the reading of the partition has been finished and children's partition are readable. + TActorId LastPipe; + + // Generation of PQ-tablet and cookie for synchronization of commit information. + ui32 PartitionGeneration; + ui64 PartitionCookie; + + // Return true if the reading of the partition has been finished and children's partitions are readable. bool IsFinished() const; // Return true if children's partitions can't be balance separately. bool NeedReleaseChildren() const; + bool BalanceToOtherPipe() const; // Called when reading from a partition is started. // Return true if the reading of the partition has been finished before. @@ -226,7 +233,7 @@ private: // Called when the partition is inactive and commited offset is equal to EndOffset. // Return true if the commited status changed. - bool SetCommittedState(); + bool SetCommittedState(ui32 generation, ui64 cookie); // Called when the partition reading finished. // Return true if the reading status changed. bool SetFinishedState(bool scaleAwareSDK, bool startedReadingFromEndOffset); @@ -260,10 +267,10 @@ private: }; struct TClientGroupInfo { - TClientGroupInfo(const TClientInfo& clientInfo) + TClientGroupInfo(TClientInfo& clientInfo) : ClientInfo(clientInfo) {} - const TClientInfo& ClientInfo; + TClientInfo& ClientInfo; TString ClientId; TString Topic; @@ -352,7 +359,7 @@ private: bool IsReadeable(ui32 partitionId) const; bool IsFinished(ui32 partitionId) const; - bool SetCommittedState(ui32 partitionId); + bool SetCommittedState(ui32 partitionId, ui32 generation, ui64 cookie); TClientGroupInfo* FindGroup(ui32 partitionId); }; diff --git a/ydb/core/persqueue/read_balancer__types.cpp b/ydb/core/persqueue/read_balancer__types.cpp index a232c492583..803d933ede4 100644 --- a/ydb/core/persqueue/read_balancer__types.cpp +++ b/ydb/core/persqueue/read_balancer__types.cpp @@ -15,6 +15,10 @@ bool TPersQueueReadBalancer::TReadingPartitionStatus::NeedReleaseChildren() cons return !(Commited || (ReadingFinished && !ScaleAwareSDK)); } +bool TPersQueueReadBalancer::TReadingPartitionStatus::BalanceToOtherPipe() const { + return LastPipe && !Commited && ReadingFinished && !ScaleAwareSDK; +} + bool TPersQueueReadBalancer::TReadingPartitionStatus::StartReading() { return std::exchange(ReadingFinished, false); } @@ -25,9 +29,16 @@ bool TPersQueueReadBalancer::TReadingPartitionStatus::StopReading() { return NeedReleaseChildren(); } -bool TPersQueueReadBalancer::TReadingPartitionStatus::SetCommittedState() { - Iteration = 0; - return !std::exchange(Commited, true); +bool TPersQueueReadBalancer::TReadingPartitionStatus::SetCommittedState(ui32 generation, ui64 cookie) { + if (PartitionGeneration < generation || (PartitionGeneration == generation && PartitionCookie < cookie)) { + Iteration = 0; + PartitionGeneration = generation; + PartitionCookie = cookie; + + return !std::exchange(Commited, true); + } + + return false; } bool TPersQueueReadBalancer::TReadingPartitionStatus::SetFinishedState(bool scaleAwareSDK, bool startedReadingFromEndOffset) { @@ -44,6 +55,9 @@ bool TPersQueueReadBalancer::TReadingPartitionStatus::SetFinishedState(bool scal } else { ++Iteration; } + if (scaleAwareSDK || currentStatus) { + LastPipe = TActorId(); + } return currentStatus && !previousStatus; } @@ -54,6 +68,7 @@ bool TPersQueueReadBalancer::TReadingPartitionStatus::Reset() { ReadingFinished = false; Commited = false; ++Cookie; + LastPipe = TActorId(); return result; }; diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp index 9becbd72300..34ce7796bd2 100644 --- a/ydb/core/persqueue/ut/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp @@ -283,6 +283,7 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { TTestReadSession readSession1("Session-0", client, Max(), false); readSession1.Offsets[0] = 1; readSession1.WaitAndAssertPartitions({0, 1, 2}, "Must read all exists partitions because read the partition 0 from offset 1"); + readSession1.Offsets[0] = 0; TTestReadSession readSession2("Session-1", client, Max(), false, 0); readSession2.Offsets[0] = 0; @@ -298,7 +299,10 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { readSession2.Assert({0}, p2, ""); readSession2.WaitAndAssertPartitions({}, "Partition must be released because reding finished"); + readSession2.Run(); + readSession1.WaitAndAssertPartitions({}, "Partitions must be read only from Session-1"); + readSession1.WaitAndAssertPartitions({0}, "Partition 0 must rebalance to other sessions (Session-0)"); readSession1.Close(); readSession2.Close(); diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp index c7f6bc72cdd..c6f26a9c295 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp @@ -29,7 +29,7 @@ TEvTx* CreateRequest(ui64 txId, NKikimrSchemeOp::TModifyScheme&& tx) { void DoRequest(TTopicSdkTestSetup& setup, ui64& txId, NKikimrSchemeOp::TPersQueueGroupDescription& scheme) { Sleep(TDuration::Seconds(1)); - Cerr << "ALTER_SCHEME: " << scheme << Endl; + Cerr << "ALTER_SCHEME: " << scheme << Endl << Flush; const auto sender = setup.GetRuntime().AllocateEdgeActor(); const auto request = CreateRequest(txId, CreateTransaction("/Root", scheme)); @@ -144,7 +144,7 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si << ", message=" << message.GetData() << ", seqNo=" << message.GetSeqNo() << ", offset=" << message.GetOffset() - << Endl; + << Endl << Flush; ReceivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(), message.GetSeqNo(), message.GetOffset(), @@ -165,14 +165,14 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si readSettings.EventHandlers_.StartPartitionSessionHandler( [&] (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl; + Cerr << ">>>>> " << Name << " Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl << Flush; auto partitionId = ev.GetPartitionSession()->GetPartitionId(); Modify([&](std::set& s) { s.insert(partitionId); }); if (Offsets.contains(partitionId)) { - Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " from offset " << Offsets[partitionId] << Endl; + Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " from offset " << Offsets[partitionId] << Endl << Flush; ev.Confirm(Offsets[partitionId], TMaybe()); } else { - Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " without offset" << Endl; + Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " without offset" << Endl << Flush; ev.Confirm(); } }); @@ -180,26 +180,26 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si readSettings.EventHandlers_.StopPartitionSessionHandler( [&] (TReadSessionEvent::TStopPartitionSessionEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TStopPartitionSessionEvent message " << ev.DebugString() << Endl; + Cerr << ">>>>> " << Name << " Received TStopPartitionSessionEvent message " << ev.DebugString() << Endl << Flush; auto partitionId = ev.GetPartitionSession()->GetPartitionId(); Modify([&](std::set& s) { s.erase(partitionId); }); - Cerr << ">>>>> " << Name << " Stop reading partition " << partitionId << Endl; + Cerr << ">>>>> " << Name << " Stop reading partition " << partitionId << Endl << Flush; ev.Confirm(); }); readSettings.EventHandlers_.PartitionSessionClosedHandler( [&] (TReadSessionEvent::TPartitionSessionClosedEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TPartitionSessionClosedEvent message " << ev.DebugString() << Endl; + Cerr << ">>>>> " << Name << " Received TPartitionSessionClosedEvent message " << ev.DebugString() << Endl << Flush; auto partitionId = ev.GetPartitionSession()->GetPartitionId(); Modify([&](std::set& s) { s.erase(partitionId); }); - Cerr << ">>>>> " << Name << " Stop (closed) reading partition " << partitionId << Endl; + Cerr << ">>>>> " << Name << " Stop (closed) reading partition " << partitionId << Endl << Flush; }); readSettings.EventHandlers_.SessionClosedHandler( [Name=name] (const TSessionClosedEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TSessionClosedEvent message " << ev.DebugString() << Endl; + Cerr << ">>>>> " << Name << " Received TSessionClosedEvent message " << ev.DebugString() << Endl << Flush; }); Session = client.CreateReadSession(readSettings); @@ -219,17 +219,17 @@ void TTestReadSession::Commit() { } void TTestReadSession::Acquire() { - Cerr << ">>>>> " << Name << " Acquire()" << Endl; + Cerr << ">>>>> " << Name << " Acquire()" << Endl << Flush; Semaphore.Acquire(); } void TTestReadSession::Release() { - Cerr << ">>>>> " << Name << " Release()" << Endl; + Cerr << ">>>>> " << Name << " Release()" << Endl << Flush; Semaphore.Release(); } NThreading::TFuture> TTestReadSession::Wait(std::set partitions, const TString& message) { - Cerr << ">>>>> " << Name << " Wait partitions " << partitions << " " << message << Endl; + Cerr << ">>>>> " << Name << " Wait partitions " << partitions << " " << message << Endl << Flush; with_lock (Lock) { ExpectedPartitions = partitions; @@ -244,14 +244,14 @@ NThreading::TFuture> TTestReadSession::Wait(std::set pa } void TTestReadSession::Assert(const std::set& expected, NThreading::TFuture> f, const TString& message) { - Cerr << ">>>>> " << Name << " Partitions " << Partitions << " received #2" << Endl; + Cerr << ">>>>> " << Name << " Partitions " << Partitions << " received #2" << Endl << Flush; UNIT_ASSERT_VALUES_EQUAL_C(expected, f.HasValue() ? f.GetValueSync() : Partitions, message); Release(); } void TTestReadSession::WaitAndAssertPartitions(std::set partitions, const TString& message) { auto f = Wait(partitions, message); - f.Wait(TDuration::Seconds(5)); + f.Wait(TDuration::Seconds(60)); Assert(partitions, f, message); } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index c70ef763699..5718b6f3081 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -787,6 +787,9 @@ message TStatusResponse { optional int64 UsedReserveSize = 31; optional TAggregatedCounters AggregatedCounters = 32; + + optional uint32 Generation = 33; + optional uint64 Cookie = 34; } message TConsumerResult { @@ -991,6 +994,8 @@ message TEvCheckPartitionStatusResponse { message TEvReadingPartitionStatusRequest { optional string Consumer = 1; optional uint32 PartitionId = 2; + optional uint32 Generation = 3; + optional uint32 Cookie = 4; }; // The consumer's reading of the partition is finished (from ReadSession) -- cgit v1.3