summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/persqueue/events/internal.h4
-rw-r--r--ydb/core/persqueue/partition.cpp2
-rw-r--r--ydb/core/persqueue/partition.h2
-rw-r--r--ydb/core/persqueue/partition_read.cpp2
-rw-r--r--ydb/core/persqueue/read_balancer.cpp55
-rw-r--r--ydb/core/persqueue/read_balancer.h17
-rw-r--r--ydb/core/persqueue/read_balancer__types.cpp21
-rw-r--r--ydb/core/persqueue/ut/autoscaling_ut.cpp4
-rw-r--r--ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp30
-rw-r--r--ydb/core/protos/pqconfig.proto5
10 files changed, 110 insertions, 32 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 44b5cf31e0a..58a11679b4a 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -1090,9 +1090,11 @@ struct TEvPQ {
struct TEvReadingPartitionStatusRequest : public TEventPB<TEvReadingPartitionStatusRequest, NKikimrPQ::TEvReadingPartitionStatusRequest, EvReadingPartitionStatusRequest> {
TEvReadingPartitionStatusRequest() = default;
- TEvReadingPartitionStatusRequest(const TString& consumer, ui32 partitionId) {
+ TEvReadingPartitionStatusRequest(const TString& consumer, ui32 partitionId, ui32 generaion, ui64 cookie) {
Record.SetConsumer(consumer);
Record.SetPartitionId(partitionId);
+ Record.SetGeneration(generaion);
+ Record.SetCookie(cookie);
}
};
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 76829604b4c..0bf4f565f89 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -631,6 +631,8 @@ void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContex
void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) {
NKikimrPQ::TStatusResponse::TPartResult result;
result.SetPartition(Partition.InternalPartitionId);
+ result.SetGeneration(TabletGeneration);
+ result.SetCookie(++PQRBCookie);
if (DiskIsFull || WaitingForSubDomainQuota(ctx)) {
result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_DISK_IS_FULL);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index c8840e87900..613dc00147e 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -679,6 +679,7 @@ private:
TDuration InitDuration;
bool InitDone;
bool NewPartition;
+ ui64 PQRBCookie = 0;
THashMap<TString, NKikimr::NPQ::TOwnerInfo> Owners;
THashSet<TActorId> OwnerPipes;
@@ -781,4 +782,3 @@ private:
};
} // namespace NKikimr::NPQ
-
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp
index 3ff237ae45c..a6f7f80a959 100644
--- a/ydb/core/persqueue/partition_read.cpp
+++ b/ydb/core/persqueue/partition_read.cpp
@@ -30,7 +30,7 @@ namespace NKikimr::NPQ {
static const ui32 MAX_USER_ACTS = 1000;
void TPartition::SendReadingFinished(const TString& consumer) {
- Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId));
+ Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId, TabletGeneration, ++PQRBCookie));
}
void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 1090f5651d5..aaeb5ac1968 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -718,12 +718,14 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c
for (const auto& partRes : record.GetPartResult()) {
auto partitionId = partRes.GetPartition();
+ auto generation = partRes.GetGeneration();
+ auto cookie = partRes.GetCookie();
for (const auto& consumer : partRes.GetConsumerResult()) {
if (consumer.GetReadingFinished()) {
auto it = ClientsInfo.find(consumer.GetConsumer());
if (it != ClientsInfo.end()) {
auto& clientInfo = it->second;
- if (clientInfo.IsReadeable(partitionId) && clientInfo.SetCommittedState(partitionId)) {
+ if (clientInfo.IsReadeable(partitionId) && clientInfo.SetCommittedState(partitionId, generation, cookie)) {
clientInfo.ProccessReadingFinished(partRes.GetPartition(), ctx);
}
}
@@ -1127,8 +1129,8 @@ bool TPersQueueReadBalancer::TClientInfo::IsFinished(ui32 partitionId) const {
return it->second.IsFinished();
}
-bool TPersQueueReadBalancer::TClientInfo::SetCommittedState(ui32 partitionId) {
- return ReadingPartitionStatus[partitionId].SetCommittedState();
+bool TPersQueueReadBalancer::TClientInfo::SetCommittedState(ui32 partitionId, ui32 generation, ui64 cookie) {
+ return ReadingPartitionStatus[partitionId].SetCommittedState(generation, cookie);
}
TPersQueueReadBalancer::TClientGroupInfo* TPersQueueReadBalancer::TClientInfo::FindGroup(ui32 partitionId) {
@@ -1671,8 +1673,11 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockMissingPartitions(
const TActorContext& ctx) {
std::deque<ui32> freePartitions = std::move(FreePartitions);
+ std::deque<ui32> toOtherPartitions;
for (auto& [sessionKey, sessionInfo] : SessionsInfo) {
+ auto& pipe = sessionKey.first;
+
ui32 realDesired = (allowPlusOne > 0) ? desired + 1 : desired;
if (allowPlusOne > 0) {
--allowPlusOne;
@@ -1687,8 +1692,13 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockMissingPartitions(
while (req > 0 && !freePartitions.empty()) {
auto partitionId = freePartitions.front();
if (partitionPredicate(partitionId)) {
- --req;
- LockPartition(sessionKey.first, sessionInfo, partitionId, ctx);
+ auto& status = ClientInfo.GetPartitionReadingStatus(partitionId);
+ if (status.BalanceToOtherPipe() && status.LastPipe != pipe || SessionsInfo.size() == 1) {
+ --req;
+ LockPartition(pipe, sessionInfo, partitionId, ctx);
+ } else {
+ toOtherPartitions.push_back(partitionId);
+ }
} else {
FreePartitions.push_back(partitionId);
}
@@ -1700,6 +1710,38 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockMissingPartitions(
}
}
+ if (!toOtherPartitions.empty()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
+ GetPrefix() << "client: "<< ClientId << " balance group " << Group << " partitions " << JoinRange(", ", toOtherPartitions.begin(), toOtherPartitions.end()) << " to other sessions");
+
+ for (auto& [sessionKey, sessionInfo] : SessionsInfo) {
+ auto& pipe = sessionKey.first;
+ ui32 realDesired = desired + 1;
+
+ ssize_t actual = actualExtractor(sessionInfo);
+ if (actual >= realDesired) {
+ continue;
+ }
+
+ ssize_t req = ((ssize_t)realDesired) - actual;
+ size_t possibleIterations = toOtherPartitions.size();
+ while (req > 0 && !toOtherPartitions.empty() && possibleIterations) {
+ auto partitionId = toOtherPartitions.front();
+ toOtherPartitions.pop_front();
+
+ auto& status = ClientInfo.GetPartitionReadingStatus(partitionId);
+ if (status.LastPipe != pipe) {
+ --req;
+ --possibleIterations;
+ LockPartition(pipe, sessionInfo, partitionId, ctx);
+ } else {
+ --possibleIterations;
+ toOtherPartitions.push_back(partitionId);
+ }
+ }
+ }
+ }
+
FreePartitions.insert(FreePartitions.end(), freePartitions.begin(), freePartitions.end());
}
@@ -1958,7 +2000,7 @@ void TPersQueueReadBalancer::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPt
return;
}
- if (clientInfo.SetCommittedState(partitionId)) {
+ if (clientInfo.SetCommittedState(partitionId, r.GetGeneration(), r.GetCookie())) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
"The offset of the partition " << partitionId << " was commited by " << r.GetConsumer());
@@ -2048,6 +2090,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvReadingPartitionFinishedReq
<< ". Scheduled release of the partition for re-reading. Delay=" << delay << " seconds,"
<< " firstMessage=" << r.GetStartedReadingFromEndOffset() << ", " << GetSdkDebugString(r.GetScaleAwareSDK()));
+ status.LastPipe = ev->Sender;
ctx.Schedule(TDuration::Seconds(delay), new TEvPQ::TEvWakeupReleasePartition(r.GetConsumer(), partitionId, status.Cookie));
}
}
diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h
index 640541f2d86..b3375c4c379 100644
--- a/ydb/core/persqueue/read_balancer.h
+++ b/ydb/core/persqueue/read_balancer.h
@@ -212,10 +212,17 @@ private:
size_t Iteration = 0;
ui64 Cookie = 0;
- // Return true if the reading of the partition has been finished and children's partition are readable.
+ TActorId LastPipe;
+
+ // Generation of PQ-tablet and cookie for synchronization of commit information.
+ ui32 PartitionGeneration;
+ ui64 PartitionCookie;
+
+ // Return true if the reading of the partition has been finished and children's partitions are readable.
bool IsFinished() const;
// Return true if children's partitions can't be balance separately.
bool NeedReleaseChildren() const;
+ bool BalanceToOtherPipe() const;
// Called when reading from a partition is started.
// Return true if the reading of the partition has been finished before.
@@ -226,7 +233,7 @@ private:
// Called when the partition is inactive and commited offset is equal to EndOffset.
// Return true if the commited status changed.
- bool SetCommittedState();
+ bool SetCommittedState(ui32 generation, ui64 cookie);
// Called when the partition reading finished.
// Return true if the reading status changed.
bool SetFinishedState(bool scaleAwareSDK, bool startedReadingFromEndOffset);
@@ -260,10 +267,10 @@ private:
};
struct TClientGroupInfo {
- TClientGroupInfo(const TClientInfo& clientInfo)
+ TClientGroupInfo(TClientInfo& clientInfo)
: ClientInfo(clientInfo) {}
- const TClientInfo& ClientInfo;
+ TClientInfo& ClientInfo;
TString ClientId;
TString Topic;
@@ -352,7 +359,7 @@ private:
bool IsReadeable(ui32 partitionId) const;
bool IsFinished(ui32 partitionId) const;
- bool SetCommittedState(ui32 partitionId);
+ bool SetCommittedState(ui32 partitionId, ui32 generation, ui64 cookie);
TClientGroupInfo* FindGroup(ui32 partitionId);
};
diff --git a/ydb/core/persqueue/read_balancer__types.cpp b/ydb/core/persqueue/read_balancer__types.cpp
index a232c492583..803d933ede4 100644
--- a/ydb/core/persqueue/read_balancer__types.cpp
+++ b/ydb/core/persqueue/read_balancer__types.cpp
@@ -15,6 +15,10 @@ bool TPersQueueReadBalancer::TReadingPartitionStatus::NeedReleaseChildren() cons
return !(Commited || (ReadingFinished && !ScaleAwareSDK));
}
+bool TPersQueueReadBalancer::TReadingPartitionStatus::BalanceToOtherPipe() const {
+ return LastPipe && !Commited && ReadingFinished && !ScaleAwareSDK;
+}
+
bool TPersQueueReadBalancer::TReadingPartitionStatus::StartReading() {
return std::exchange(ReadingFinished, false);
}
@@ -25,9 +29,16 @@ bool TPersQueueReadBalancer::TReadingPartitionStatus::StopReading() {
return NeedReleaseChildren();
}
-bool TPersQueueReadBalancer::TReadingPartitionStatus::SetCommittedState() {
- Iteration = 0;
- return !std::exchange(Commited, true);
+bool TPersQueueReadBalancer::TReadingPartitionStatus::SetCommittedState(ui32 generation, ui64 cookie) {
+ if (PartitionGeneration < generation || (PartitionGeneration == generation && PartitionCookie < cookie)) {
+ Iteration = 0;
+ PartitionGeneration = generation;
+ PartitionCookie = cookie;
+
+ return !std::exchange(Commited, true);
+ }
+
+ return false;
}
bool TPersQueueReadBalancer::TReadingPartitionStatus::SetFinishedState(bool scaleAwareSDK, bool startedReadingFromEndOffset) {
@@ -44,6 +55,9 @@ bool TPersQueueReadBalancer::TReadingPartitionStatus::SetFinishedState(bool scal
} else {
++Iteration;
}
+ if (scaleAwareSDK || currentStatus) {
+ LastPipe = TActorId();
+ }
return currentStatus && !previousStatus;
}
@@ -54,6 +68,7 @@ bool TPersQueueReadBalancer::TReadingPartitionStatus::Reset() {
ReadingFinished = false;
Commited = false;
++Cookie;
+ LastPipe = TActorId();
return result;
};
diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp
index 9becbd72300..34ce7796bd2 100644
--- a/ydb/core/persqueue/ut/autoscaling_ut.cpp
+++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp
@@ -283,6 +283,7 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
TTestReadSession readSession1("Session-0", client, Max<size_t>(), false);
readSession1.Offsets[0] = 1;
readSession1.WaitAndAssertPartitions({0, 1, 2}, "Must read all exists partitions because read the partition 0 from offset 1");
+ readSession1.Offsets[0] = 0;
TTestReadSession readSession2("Session-1", client, Max<size_t>(), false, 0);
readSession2.Offsets[0] = 0;
@@ -298,7 +299,10 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
readSession2.Assert({0}, p2, "");
readSession2.WaitAndAssertPartitions({}, "Partition must be released because reding finished");
+ readSession2.Run();
+
readSession1.WaitAndAssertPartitions({}, "Partitions must be read only from Session-1");
+ readSession1.WaitAndAssertPartitions({0}, "Partition 0 must rebalance to other sessions (Session-0)");
readSession1.Close();
readSession2.Close();
diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
index c7f6bc72cdd..c6f26a9c295 100644
--- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
@@ -29,7 +29,7 @@ TEvTx* CreateRequest(ui64 txId, NKikimrSchemeOp::TModifyScheme&& tx) {
void DoRequest(TTopicSdkTestSetup& setup, ui64& txId, NKikimrSchemeOp::TPersQueueGroupDescription& scheme) {
Sleep(TDuration::Seconds(1));
- Cerr << "ALTER_SCHEME: " << scheme << Endl;
+ Cerr << "ALTER_SCHEME: " << scheme << Endl << Flush;
const auto sender = setup.GetRuntime().AllocateEdgeActor();
const auto request = CreateRequest(txId, CreateTransaction("/Root", scheme));
@@ -144,7 +144,7 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si
<< ", message=" << message.GetData()
<< ", seqNo=" << message.GetSeqNo()
<< ", offset=" << message.GetOffset()
- << Endl;
+ << Endl << Flush;
ReceivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(),
message.GetSeqNo(),
message.GetOffset(),
@@ -165,14 +165,14 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si
readSettings.EventHandlers_.StartPartitionSessionHandler(
[&]
(TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable {
- Cerr << ">>>>> " << Name << " Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl;
+ Cerr << ">>>>> " << Name << " Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl << Flush;
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
Modify([&](std::set<size_t>& s) { s.insert(partitionId); });
if (Offsets.contains(partitionId)) {
- Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " from offset " << Offsets[partitionId] << Endl;
+ Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " from offset " << Offsets[partitionId] << Endl << Flush;
ev.Confirm(Offsets[partitionId], TMaybe<ui64>());
} else {
- Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " without offset" << Endl;
+ Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " without offset" << Endl << Flush;
ev.Confirm();
}
});
@@ -180,26 +180,26 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si
readSettings.EventHandlers_.StopPartitionSessionHandler(
[&]
(TReadSessionEvent::TStopPartitionSessionEvent& ev) mutable {
- Cerr << ">>>>> " << Name << " Received TStopPartitionSessionEvent message " << ev.DebugString() << Endl;
+ Cerr << ">>>>> " << Name << " Received TStopPartitionSessionEvent message " << ev.DebugString() << Endl << Flush;
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
Modify([&](std::set<size_t>& s) { s.erase(partitionId); });
- Cerr << ">>>>> " << Name << " Stop reading partition " << partitionId << Endl;
+ Cerr << ">>>>> " << Name << " Stop reading partition " << partitionId << Endl << Flush;
ev.Confirm();
});
readSettings.EventHandlers_.PartitionSessionClosedHandler(
[&]
(TReadSessionEvent::TPartitionSessionClosedEvent& ev) mutable {
- Cerr << ">>>>> " << Name << " Received TPartitionSessionClosedEvent message " << ev.DebugString() << Endl;
+ Cerr << ">>>>> " << Name << " Received TPartitionSessionClosedEvent message " << ev.DebugString() << Endl << Flush;
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
Modify([&](std::set<size_t>& s) { s.erase(partitionId); });
- Cerr << ">>>>> " << Name << " Stop (closed) reading partition " << partitionId << Endl;
+ Cerr << ">>>>> " << Name << " Stop (closed) reading partition " << partitionId << Endl << Flush;
});
readSettings.EventHandlers_.SessionClosedHandler(
[Name=name]
(const TSessionClosedEvent& ev) mutable {
- Cerr << ">>>>> " << Name << " Received TSessionClosedEvent message " << ev.DebugString() << Endl;
+ Cerr << ">>>>> " << Name << " Received TSessionClosedEvent message " << ev.DebugString() << Endl << Flush;
});
Session = client.CreateReadSession(readSettings);
@@ -219,17 +219,17 @@ void TTestReadSession::Commit() {
}
void TTestReadSession::Acquire() {
- Cerr << ">>>>> " << Name << " Acquire()" << Endl;
+ Cerr << ">>>>> " << Name << " Acquire()" << Endl << Flush;
Semaphore.Acquire();
}
void TTestReadSession::Release() {
- Cerr << ">>>>> " << Name << " Release()" << Endl;
+ Cerr << ">>>>> " << Name << " Release()" << Endl << Flush;
Semaphore.Release();
}
NThreading::TFuture<std::set<size_t>> TTestReadSession::Wait(std::set<size_t> partitions, const TString& message) {
- Cerr << ">>>>> " << Name << " Wait partitions " << partitions << " " << message << Endl;
+ Cerr << ">>>>> " << Name << " Wait partitions " << partitions << " " << message << Endl << Flush;
with_lock (Lock) {
ExpectedPartitions = partitions;
@@ -244,14 +244,14 @@ NThreading::TFuture<std::set<size_t>> TTestReadSession::Wait(std::set<size_t> pa
}
void TTestReadSession::Assert(const std::set<size_t>& expected, NThreading::TFuture<std::set<size_t>> f, const TString& message) {
- Cerr << ">>>>> " << Name << " Partitions " << Partitions << " received #2" << Endl;
+ Cerr << ">>>>> " << Name << " Partitions " << Partitions << " received #2" << Endl << Flush;
UNIT_ASSERT_VALUES_EQUAL_C(expected, f.HasValue() ? f.GetValueSync() : Partitions, message);
Release();
}
void TTestReadSession::WaitAndAssertPartitions(std::set<size_t> partitions, const TString& message) {
auto f = Wait(partitions, message);
- f.Wait(TDuration::Seconds(5));
+ f.Wait(TDuration::Seconds(60));
Assert(partitions, f, message);
}
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index c70ef763699..5718b6f3081 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -787,6 +787,9 @@ message TStatusResponse {
optional int64 UsedReserveSize = 31;
optional TAggregatedCounters AggregatedCounters = 32;
+
+ optional uint32 Generation = 33;
+ optional uint64 Cookie = 34;
}
message TConsumerResult {
@@ -991,6 +994,8 @@ message TEvCheckPartitionStatusResponse {
message TEvReadingPartitionStatusRequest {
optional string Consumer = 1;
optional uint32 PartitionId = 2;
+ optional uint32 Generation = 3;
+ optional uint32 Cookie = 4;
};
// The consumer's reading of the partition is finished (from ReadSession)