aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2024-08-02 14:23:22 +0500
committerGitHub <noreply@github.com>2024-08-02 14:23:22 +0500
commit53e1321330a5bdcfeb31fffdc17f59434c74a05b (patch)
treea74e059b694ef3a067cf25cf21faf54b7c54ce26
parent04428602d023e7af8832281a44a8d0cd6ff38447 (diff)
downloadydb-53e1321330a5bdcfeb31fffdc17f59434c74a05b.tar.gz
Split the partition if there is only more than 1 producer (#7379)
-rw-r--r--ydb/core/persqueue/partition.h2
-rw-r--r--ydb/core/persqueue/partition_write.cpp41
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp80
-rw-r--r--ydb/core/persqueue/ut/utils_ut.cpp81
-rw-r--r--ydb/core/persqueue/ut/ya.make1
-rw-r--r--ydb/core/persqueue/utils.cpp27
-rw-r--r--ydb/core/persqueue/utils.h18
7 files changed, 188 insertions, 62 deletions
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 5228bdeb444..cfa7563cb22 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -916,6 +916,8 @@ private:
TDeque<std::unique_ptr<IEventBase>> PendingEvents;
TRowVersion LastEmittedHeartbeat;
+ TLastCounter SourceIdCounter;
+
const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config);
bool ClosedInternalPartition = false;
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 2cb1f066461..ac235fc35fe 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -239,13 +239,14 @@ void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ct
void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
PQ_LOG_T("TPartition::AnswerCurrentWrites. Responses.size()=" << Responses.size());
+ const auto now = ctx.Now();
ui64 offset = EndOffset;
while (!Responses.empty()) {
const auto& response = Responses.front();
const TDuration queueTime = response.QueueTime;
- const TDuration writeTime = ctx.Now() - response.WriteTimeBaseline;
+ const TDuration writeTime = now - response.WriteTimeBaseline;
if (response.IsWrite()) {
const auto& writeResponse = response.GetWrite();
@@ -257,6 +258,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
bool already = false;
+ SourceIdCounter.Use(s, now);
auto it = SourceIdStorage.GetInMemorySourceIds().find(s);
ui64 maxSeqNo = 0;
@@ -485,13 +487,15 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
}
HaveWriteMsg = false;
+ const auto now = ctx.Now();
+
for (auto& [sourceId, info] : TxSourceIdForPostPersist) {
auto it = SourceIdStorage.GetInMemorySourceIds().find(sourceId);
if (it.IsEnd()) {
- SourceIdStorage.RegisterSourceId(sourceId, info.SeqNo, info.Offset, ctx.Now());
+ SourceIdStorage.RegisterSourceId(sourceId, info.SeqNo, info.Offset, now);
} else {
ui64 seqNo = std::max(info.SeqNo, it->second.SeqNo);
- SourceIdStorage.RegisterSourceId(sourceId, it->second.Updated(seqNo, info.Offset, ctx.Now()));
+ SourceIdStorage.RegisterSourceId(sourceId, it->second.Updated(seqNo, info.Offset, now));
}
}
TxSourceIdForPostPersist.clear();
@@ -505,8 +509,8 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
}
ui64 prevEndOffset = EndOffset;
- ui32 totalLatencyMs = (ctx.Now() - WriteCycleStartTime).MilliSeconds();
- ui32 writeLatencyMs = (ctx.Now() - WriteStartTime).MilliSeconds();
+ ui32 totalLatencyMs = (now - WriteCycleStartTime).MilliSeconds();
+ ui32 writeLatencyMs = (now - WriteStartTime).MilliSeconds();
WriteLatency.IncFor(writeLatencyMs, 1);
if (writeLatencyMs >= AppData(ctx)->PQConfig.GetWriteLatencyBigMs()) {
@@ -522,7 +526,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
UpdateAfterWriteCounters(true);
//All ok
- auto now = ctx.Now();
for (auto& avg : AvgWriteBytes) {
avg.Update(WriteNewSize, now);
}
@@ -538,11 +541,9 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
if (SupportivePartitionTimeLag) {
SupportivePartitionTimeLag->UpdateTimestamp(now.MilliSeconds());
}
- if (SplitMergeEnabled(Config)) {
- SplitMergeAvgWriteBytes->Update(WriteNewSizeFull, now);
- auto needScaling = CheckScaleStatus(ctx);
- ChangeScaleStatusIfNeeded(needScaling);
- }
+
+ auto writeNewSizeFull = WriteNewSizeFull;
+
WriteCycleSize = 0;
WriteNewSize = 0;
WriteNewSizeFull = 0;
@@ -556,6 +557,12 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
AnswerCurrentWrites(ctx);
SyncMemoryStateWithKVState(ctx);
+ if (SplitMergeEnabled(Config)) {
+ SplitMergeAvgWriteBytes->Update(writeNewSizeFull, now);
+ auto needScaling = CheckScaleStatus(ctx);
+ ChangeScaleStatusIfNeeded(needScaling);
+ }
+
//if EndOffset changed there could be subscriptions witch could be completed
TVector<std::pair<TReadInfo, ui64>> reads = Subscriber.GetReads(EndOffset);
for (auto& read : reads) {
@@ -569,7 +576,10 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
}
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
- auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
+ const auto writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
+ const auto sourceIdWindow = TDuration::Seconds(std::min<ui32>(5, Config.GetPartitionStrategy().GetScaleThresholdSeconds()));
+ const auto sourceIdCount = SourceIdCounter.Count(ctx.Now() - sourceIdWindow);
+
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus"
@@ -577,15 +587,17 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
<< " writeSpeedUsagePercent# " << writeSpeedUsagePercent
<< " scaleThresholdSeconds# " << Config.GetPartitionStrategy().GetScaleThresholdSeconds()
<< " totalPartitionWriteSpeed# " << TotalPartitionWriteSpeed
+ << " sourceIdCount=" << sourceIdCount
<< " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
+
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
- if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
+ if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent() && sourceIdCount > 1) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"." <<
@@ -596,7 +608,8 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus NEED_MERGE" << " Topic: \"" << TopicName() << "\"." <<
- " Partition: " << Partition
+ " Partition: " << Partition << " writeSpeedUsagePercent: " << writeSpeedUsagePercent <<
+ " Threshold: " << Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()
);
return NKikimrPQ::EScaleStatus::NEED_MERGE;
}
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
index ec9f40c3b12..c9f4e7bd511 100644
--- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
+++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
@@ -400,9 +400,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}
- writeSession1->Close(TDuration::Seconds(1));
- writeSession2->Close(TDuration::Seconds(1));
- writeSession3->Close(TDuration::Seconds(1));
+ writeSession1->Close(TDuration::Seconds(2));
+ writeSession2->Close(TDuration::Seconds(2));
+ writeSession3->Close(TDuration::Seconds(2));
readSession.Close();
}
@@ -836,7 +836,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
.BeginConfigureAutoPartitioningSettings()
.UpUtilizationPercent(2)
.DownUtilizationPercent(1)
- .StabilizationWindow(TDuration::Seconds(1))
+ .StabilizationWindow(TDuration::Seconds(2))
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
@@ -844,55 +844,39 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
auto msg = TString(1_MB, 'a');
- auto writeSession = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false);
- UNIT_ASSERT(writeSession->Write(Msg(msg, 1)));
- UNIT_ASSERT(writeSession->Write(Msg(msg, 2)));
- Sleep(TDuration::Seconds(5));
- auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
- UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
-
- bool firstPartitionFound = false;
- for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
- if (partition.GetPartitionId() == 0) {
- firstPartitionFound = true;
- UNIT_ASSERT(!partition.GetActive());
- UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2);
- auto childIds = partition.GetChildPartitionIds();
- std::sort(childIds.begin(), childIds.end());
- UNIT_ASSERT_EQUAL(childIds[0], 1);
- UNIT_ASSERT_EQUAL(childIds[1], 2);
- }
+ auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false);
+ auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, TEST_TOPIC, false);
+
+ {
+ UNIT_ASSERT(writeSession_1->Write(Msg(msg, 1)));
+ UNIT_ASSERT(writeSession_1->Write(Msg(msg, 2)));
+ Sleep(TDuration::Seconds(5));
+ auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
+ UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
}
- UNIT_ASSERT(firstPartitionFound);
-
- TString secondPartitionTo = "";
- TString thirdPartitionFrom = "";
- for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
- if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) {
- UNIT_ASSERT(partition.GetActive());
- if (partition.GetPartitionId() == 1) {
- UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty());
- secondPartitionTo = *partition.GetToBound();
- }
- if (partition.GetPartitionId() == 2) {
- UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty());
- thirdPartitionFrom = *partition.GetFromBound();
- }
- UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1);
- UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0);
- }
+ {
+ UNIT_ASSERT(writeSession_1->Write(Msg(msg, 3)));
+ UNIT_ASSERT(writeSession_2->Write(Msg(msg, 4)));
+ UNIT_ASSERT(writeSession_1->Write(Msg(msg, 5)));
+ UNIT_ASSERT(writeSession_2->Write(Msg(msg, 6)));
+ Sleep(TDuration::Seconds(5));
+ auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
+ UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
}
- UNIT_ASSERT(!secondPartitionTo.Empty());
- UNIT_ASSERT(!thirdPartitionFrom.Empty());
+ auto writeSession2_1 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
+ auto writeSession2_2 = CreateWriteSession(client, "producer-2", 1, TEST_TOPIC, false);
- auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
- UNIT_ASSERT(writeSession2->Write(Msg(msg, 3)));
- UNIT_ASSERT(writeSession2->Write(Msg(msg, 4)));
- Sleep(TDuration::Seconds(5));
- auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
- UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
+ {
+ UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 7)));
+ UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 8)));
+ UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 9)));
+ UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 10)));
+ Sleep(TDuration::Seconds(5));
+ auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
+ UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
+ }
}
Y_UNIT_TEST(MidOfRange) {
diff --git a/ydb/core/persqueue/ut/utils_ut.cpp b/ydb/core/persqueue/ut/utils_ut.cpp
new file mode 100644
index 00000000000..85513ea70a9
--- /dev/null
+++ b/ydb/core/persqueue/ut/utils_ut.cpp
@@ -0,0 +1,81 @@
+#include <ydb/core/persqueue/utils.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NPQ {
+
+Y_UNIT_TEST_SUITE(TPQUtilsTest) {
+ Y_UNIT_TEST(TLastCounter) {
+ TLastCounter counter;
+
+ TInstant now = TInstant::Now();
+
+ {
+ auto r = counter.Count(now);
+ UNIT_ASSERT_VALUES_EQUAL(r, 0);
+ }
+
+ {
+ counter.Use("v-1", now);
+ auto r = counter.Count(now);
+ UNIT_ASSERT_VALUES_EQUAL(r, 1);
+ }
+
+ {
+ counter.Use("v-1", now);
+ auto r = counter.Count(now);
+ UNIT_ASSERT_VALUES_EQUAL(r, 1);
+ }
+
+ now += TDuration::Seconds(1);
+
+ {
+ counter.Use("v-1", now);
+ auto r = counter.Count(now - TDuration::Seconds(10));
+ UNIT_ASSERT_VALUES_EQUAL(r, 1);
+ }
+
+ {
+ auto r = counter.Count(now);
+ UNIT_ASSERT_VALUES_EQUAL(r, 1);
+ }
+
+ {
+ counter.Use("v-2", now);
+ auto r = counter.Count(now - TDuration::Seconds(10));
+ UNIT_ASSERT_VALUES_EQUAL(r, 2);
+ }
+
+ {
+ counter.Use("v-1", now);
+ auto r = counter.Count(now - TDuration::Seconds(10));
+ UNIT_ASSERT_VALUES_EQUAL(r, 2);
+ }
+
+ now += TDuration::Seconds(1);
+
+ {
+ counter.Use("v-3", now);
+ auto r = counter.Count(now - TDuration::Seconds(10));
+ UNIT_ASSERT_VALUES_EQUAL(r, 2);
+ }
+
+ now += TDuration::Seconds(1);
+
+ {
+ counter.Use("v-3", now);
+ auto r = counter.Count(now - TDuration::Seconds(10));
+ UNIT_ASSERT_VALUES_EQUAL(r, 2);
+ }
+
+ now += TDuration::Seconds(1);
+
+ {
+ counter.Use("v-2", now);
+ auto r = counter.Count(now - TDuration::Seconds(10));
+ UNIT_ASSERT_VALUES_EQUAL(r, 2);
+ }
+ }
+}
+
+}
diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make
index 8b3e64ac235..c91c10ecd96 100644
--- a/ydb/core/persqueue/ut/ya.make
+++ b/ydb/core/persqueue/ut/ya.make
@@ -45,6 +45,7 @@ SRCS(
pqrb_describes_ut.cpp
microseconds_sliding_window_ut.cpp
fetch_request_ut.cpp
+ utils_ut.cpp
)
RESOURCE(
diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp
index 828e20cfbec..1003a8003a0 100644
--- a/ydb/core/persqueue/utils.cpp
+++ b/ydb/core/persqueue/utils.cpp
@@ -298,4 +298,31 @@ TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescrip
return TPartitionGraph(BuildGraph<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>(config.GetPartitions()));
}
+void TLastCounter::Use(const TString& value, const TInstant& now) {
+ const auto full = MaxValueCount == Values.size();
+ if (!Values.empty() && Values[0].Value == value) {
+ auto& v0 = Values[0];
+ if (v0.LastUseTime < now) {
+ v0.LastUseTime = now;
+ if (full && Values[1].LastUseTime != now) {
+ Values.push_back(std::move(v0));
+ Values.pop_front();
+ }
+ }
+ } else if (full && Values[1].Value == value) {
+ Values[1].LastUseTime = now;
+ } else if (!full || Values[0].LastUseTime < now) {
+ if (full) {
+ Values.pop_front();
+ }
+ Values.push_back(Data{now, value});
+ }
+}
+
+size_t TLastCounter::Count(const TInstant& expirationTime) {
+ return std::count_if(Values.begin(), Values.end(), [&](const auto& i) {
+ return i.LastUseTime >= expirationTime;
+ });
+}
+
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h
index 27373a85183..7c42e70ff59 100644
--- a/ydb/core/persqueue/utils.h
+++ b/ydb/core/persqueue/utils.h
@@ -1,5 +1,7 @@
#pragma once
+#include <deque>
+#include <util/datetime/base.h>
#include <util/string/builder.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/pqconfig.pb.h>
@@ -69,4 +71,20 @@ TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config);
TPartitionGraph MakePartitionGraph(const NKikimrPQ::TUpdateBalancerConfig& config);
TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config);
+class TLastCounter {
+ static constexpr size_t MaxValueCount = 2;
+
+public:
+ void Use(const TString& value, const TInstant& now);
+ size_t Count(const TInstant& expirationTime);
+
+private:
+ struct Data {
+ TInstant LastUseTime;
+ TString Value;
+ };
+ std::deque<Data> Values;
+};
+
+
} // NKikimr::NPQ