diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2024-08-02 14:23:22 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-02 14:23:22 +0500 |
commit | 53e1321330a5bdcfeb31fffdc17f59434c74a05b (patch) | |
tree | a74e059b694ef3a067cf25cf21faf54b7c54ce26 | |
parent | 04428602d023e7af8832281a44a8d0cd6ff38447 (diff) | |
download | ydb-53e1321330a5bdcfeb31fffdc17f59434c74a05b.tar.gz |
Split the partition if there is only more than 1 producer (#7379)
-rw-r--r-- | ydb/core/persqueue/partition.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 41 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp | 80 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/utils_ut.cpp | 81 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/utils.cpp | 27 | ||||
-rw-r--r-- | ydb/core/persqueue/utils.h | 18 |
7 files changed, 188 insertions, 62 deletions
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 5228bdeb444..cfa7563cb22 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -916,6 +916,8 @@ private: TDeque<std::unique_ptr<IEventBase>> PendingEvents; TRowVersion LastEmittedHeartbeat; + TLastCounter SourceIdCounter; + const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config); bool ClosedInternalPartition = false; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 2cb1f066461..ac235fc35fe 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -239,13 +239,14 @@ void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ct void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { PQ_LOG_T("TPartition::AnswerCurrentWrites. Responses.size()=" << Responses.size()); + const auto now = ctx.Now(); ui64 offset = EndOffset; while (!Responses.empty()) { const auto& response = Responses.front(); const TDuration queueTime = response.QueueTime; - const TDuration writeTime = ctx.Now() - response.WriteTimeBaseline; + const TDuration writeTime = now - response.WriteTimeBaseline; if (response.IsWrite()) { const auto& writeResponse = response.GetWrite(); @@ -257,6 +258,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { bool already = false; + SourceIdCounter.Use(s, now); auto it = SourceIdStorage.GetInMemorySourceIds().find(s); ui64 maxSeqNo = 0; @@ -485,13 +487,15 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { } HaveWriteMsg = false; + const auto now = ctx.Now(); + for (auto& [sourceId, info] : TxSourceIdForPostPersist) { auto it = SourceIdStorage.GetInMemorySourceIds().find(sourceId); if (it.IsEnd()) { - SourceIdStorage.RegisterSourceId(sourceId, info.SeqNo, info.Offset, ctx.Now()); + SourceIdStorage.RegisterSourceId(sourceId, info.SeqNo, info.Offset, now); } else { ui64 seqNo = std::max(info.SeqNo, it->second.SeqNo); - SourceIdStorage.RegisterSourceId(sourceId, it->second.Updated(seqNo, info.Offset, ctx.Now())); + SourceIdStorage.RegisterSourceId(sourceId, it->second.Updated(seqNo, info.Offset, now)); } } TxSourceIdForPostPersist.clear(); @@ -505,8 +509,8 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { } ui64 prevEndOffset = EndOffset; - ui32 totalLatencyMs = (ctx.Now() - WriteCycleStartTime).MilliSeconds(); - ui32 writeLatencyMs = (ctx.Now() - WriteStartTime).MilliSeconds(); + ui32 totalLatencyMs = (now - WriteCycleStartTime).MilliSeconds(); + ui32 writeLatencyMs = (now - WriteStartTime).MilliSeconds(); WriteLatency.IncFor(writeLatencyMs, 1); if (writeLatencyMs >= AppData(ctx)->PQConfig.GetWriteLatencyBigMs()) { @@ -522,7 +526,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { UpdateAfterWriteCounters(true); //All ok - auto now = ctx.Now(); for (auto& avg : AvgWriteBytes) { avg.Update(WriteNewSize, now); } @@ -538,11 +541,9 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { if (SupportivePartitionTimeLag) { SupportivePartitionTimeLag->UpdateTimestamp(now.MilliSeconds()); } - if (SplitMergeEnabled(Config)) { - SplitMergeAvgWriteBytes->Update(WriteNewSizeFull, now); - auto needScaling = CheckScaleStatus(ctx); - ChangeScaleStatusIfNeeded(needScaling); - } + + auto writeNewSizeFull = WriteNewSizeFull; + WriteCycleSize = 0; WriteNewSize = 0; WriteNewSizeFull = 0; @@ -556,6 +557,12 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { AnswerCurrentWrites(ctx); SyncMemoryStateWithKVState(ctx); + if (SplitMergeEnabled(Config)) { + SplitMergeAvgWriteBytes->Update(writeNewSizeFull, now); + auto needScaling = CheckScaleStatus(ctx); + ChangeScaleStatusIfNeeded(needScaling); + } + //if EndOffset changed there could be subscriptions witch could be completed TVector<std::pair<TReadInfo, ui64>> reads = Subscriber.GetReads(EndOffset); for (auto& read : reads) { @@ -569,7 +576,10 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { } NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) { - auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed; + const auto writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed; + const auto sourceIdWindow = TDuration::Seconds(std::min<ui32>(5, Config.GetPartitionStrategy().GetScaleThresholdSeconds())); + const auto sourceIdCount = SourceIdCounter.Count(ctx.Now() - sourceIdWindow); + LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, "TPartition::CheckScaleStatus" @@ -577,15 +587,17 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) { << " writeSpeedUsagePercent# " << writeSpeedUsagePercent << " scaleThresholdSeconds# " << Config.GetPartitionStrategy().GetScaleThresholdSeconds() << " totalPartitionWriteSpeed# " << TotalPartitionWriteSpeed + << " sourceIdCount=" << sourceIdCount << " Topic: \"" << TopicName() << "\"." << " Partition: " << Partition ); + auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT || Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE; auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE; - if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) { + if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent() && sourceIdCount > 1) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, "TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"." << @@ -596,7 +608,8 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, "TPartition::CheckScaleStatus NEED_MERGE" << " Topic: \"" << TopicName() << "\"." << - " Partition: " << Partition + " Partition: " << Partition << " writeSpeedUsagePercent: " << writeSpeedUsagePercent << + " Threshold: " << Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent() ); return NKikimrPQ::EScaleStatus::NEED_MERGE; } diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index ec9f40c3b12..c9f4e7bd511 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -400,9 +400,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { } - writeSession1->Close(TDuration::Seconds(1)); - writeSession2->Close(TDuration::Seconds(1)); - writeSession3->Close(TDuration::Seconds(1)); + writeSession1->Close(TDuration::Seconds(2)); + writeSession2->Close(TDuration::Seconds(2)); + writeSession3->Close(TDuration::Seconds(2)); readSession.Close(); } @@ -836,7 +836,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { .BeginConfigureAutoPartitioningSettings() .UpUtilizationPercent(2) .DownUtilizationPercent(1) - .StabilizationWindow(TDuration::Seconds(1)) + .StabilizationWindow(TDuration::Seconds(2)) .Strategy(EAutoPartitioningStrategy::ScaleUp) .EndConfigureAutoPartitioningSettings() .EndConfigurePartitioningSettings(); @@ -844,55 +844,39 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { auto msg = TString(1_MB, 'a'); - auto writeSession = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false); - UNIT_ASSERT(writeSession->Write(Msg(msg, 1))); - UNIT_ASSERT(writeSession->Write(Msg(msg, 2))); - Sleep(TDuration::Seconds(5)); - auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync(); - UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3); - - bool firstPartitionFound = false; - for (const auto& partition : describe.GetTopicDescription().GetPartitions()) { - if (partition.GetPartitionId() == 0) { - firstPartitionFound = true; - UNIT_ASSERT(!partition.GetActive()); - UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2); - auto childIds = partition.GetChildPartitionIds(); - std::sort(childIds.begin(), childIds.end()); - UNIT_ASSERT_EQUAL(childIds[0], 1); - UNIT_ASSERT_EQUAL(childIds[1], 2); - } + auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false); + auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, TEST_TOPIC, false); + + { + UNIT_ASSERT(writeSession_1->Write(Msg(msg, 1))); + UNIT_ASSERT(writeSession_1->Write(Msg(msg, 2))); + Sleep(TDuration::Seconds(5)); + auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync(); + UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1); } - UNIT_ASSERT(firstPartitionFound); - - TString secondPartitionTo = ""; - TString thirdPartitionFrom = ""; - for (const auto& partition : describe.GetTopicDescription().GetPartitions()) { - if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) { - UNIT_ASSERT(partition.GetActive()); - if (partition.GetPartitionId() == 1) { - UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty()); - secondPartitionTo = *partition.GetToBound(); - } - if (partition.GetPartitionId() == 2) { - UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty()); - thirdPartitionFrom = *partition.GetFromBound(); - } - UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1); - UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0); - } + { + UNIT_ASSERT(writeSession_1->Write(Msg(msg, 3))); + UNIT_ASSERT(writeSession_2->Write(Msg(msg, 4))); + UNIT_ASSERT(writeSession_1->Write(Msg(msg, 5))); + UNIT_ASSERT(writeSession_2->Write(Msg(msg, 6))); + Sleep(TDuration::Seconds(5)); + auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync(); + UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3); } - UNIT_ASSERT(!secondPartitionTo.Empty()); - UNIT_ASSERT(!thirdPartitionFrom.Empty()); + auto writeSession2_1 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false); + auto writeSession2_2 = CreateWriteSession(client, "producer-2", 1, TEST_TOPIC, false); - auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false); - UNIT_ASSERT(writeSession2->Write(Msg(msg, 3))); - UNIT_ASSERT(writeSession2->Write(Msg(msg, 4))); - Sleep(TDuration::Seconds(5)); - auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync(); - UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5); + { + UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 7))); + UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 8))); + UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 9))); + UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 10))); + Sleep(TDuration::Seconds(5)); + auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync(); + UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5); + } } Y_UNIT_TEST(MidOfRange) { diff --git a/ydb/core/persqueue/ut/utils_ut.cpp b/ydb/core/persqueue/ut/utils_ut.cpp new file mode 100644 index 00000000000..85513ea70a9 --- /dev/null +++ b/ydb/core/persqueue/ut/utils_ut.cpp @@ -0,0 +1,81 @@ +#include <ydb/core/persqueue/utils.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NPQ { + +Y_UNIT_TEST_SUITE(TPQUtilsTest) { + Y_UNIT_TEST(TLastCounter) { + TLastCounter counter; + + TInstant now = TInstant::Now(); + + { + auto r = counter.Count(now); + UNIT_ASSERT_VALUES_EQUAL(r, 0); + } + + { + counter.Use("v-1", now); + auto r = counter.Count(now); + UNIT_ASSERT_VALUES_EQUAL(r, 1); + } + + { + counter.Use("v-1", now); + auto r = counter.Count(now); + UNIT_ASSERT_VALUES_EQUAL(r, 1); + } + + now += TDuration::Seconds(1); + + { + counter.Use("v-1", now); + auto r = counter.Count(now - TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(r, 1); + } + + { + auto r = counter.Count(now); + UNIT_ASSERT_VALUES_EQUAL(r, 1); + } + + { + counter.Use("v-2", now); + auto r = counter.Count(now - TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(r, 2); + } + + { + counter.Use("v-1", now); + auto r = counter.Count(now - TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(r, 2); + } + + now += TDuration::Seconds(1); + + { + counter.Use("v-3", now); + auto r = counter.Count(now - TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(r, 2); + } + + now += TDuration::Seconds(1); + + { + counter.Use("v-3", now); + auto r = counter.Count(now - TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(r, 2); + } + + now += TDuration::Seconds(1); + + { + counter.Use("v-2", now); + auto r = counter.Count(now - TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(r, 2); + } + } +} + +} diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make index 8b3e64ac235..c91c10ecd96 100644 --- a/ydb/core/persqueue/ut/ya.make +++ b/ydb/core/persqueue/ut/ya.make @@ -45,6 +45,7 @@ SRCS( pqrb_describes_ut.cpp microseconds_sliding_window_ut.cpp fetch_request_ut.cpp + utils_ut.cpp ) RESOURCE( diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 828e20cfbec..1003a8003a0 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -298,4 +298,31 @@ TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescrip return TPartitionGraph(BuildGraph<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>(config.GetPartitions())); } +void TLastCounter::Use(const TString& value, const TInstant& now) { + const auto full = MaxValueCount == Values.size(); + if (!Values.empty() && Values[0].Value == value) { + auto& v0 = Values[0]; + if (v0.LastUseTime < now) { + v0.LastUseTime = now; + if (full && Values[1].LastUseTime != now) { + Values.push_back(std::move(v0)); + Values.pop_front(); + } + } + } else if (full && Values[1].Value == value) { + Values[1].LastUseTime = now; + } else if (!full || Values[0].LastUseTime < now) { + if (full) { + Values.pop_front(); + } + Values.push_back(Data{now, value}); + } +} + +size_t TLastCounter::Count(const TInstant& expirationTime) { + return std::count_if(Values.begin(), Values.end(), [&](const auto& i) { + return i.LastUseTime >= expirationTime; + }); +} + } // NKikimr::NPQ diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index 27373a85183..7c42e70ff59 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -1,5 +1,7 @@ #pragma once +#include <deque> +#include <util/datetime/base.h> #include <util/string/builder.h> #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/protos/pqconfig.pb.h> @@ -69,4 +71,20 @@ TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config); TPartitionGraph MakePartitionGraph(const NKikimrPQ::TUpdateBalancerConfig& config); TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config); +class TLastCounter { + static constexpr size_t MaxValueCount = 2; + +public: + void Use(const TString& value, const TInstant& now); + size_t Count(const TInstant& expirationTime); + +private: + struct Data { + TInstant LastUseTime; + TString Value; + }; + std::deque<Data> Values; +}; + + } // NKikimr::NPQ |