diff options
author | alexvru <alexvru@ydb.tech> | 2023-07-17 15:37:34 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-07-17 15:37:34 +0300 |
commit | a8900aae714224b850fcd69326ba16381ec39b32 (patch) | |
tree | 715b5e757806c8c0038847c339c65a457a27d018 | |
parent | 014ff1fcc2d92fee3ff2096f203a4bc561f79b83 (diff) | |
download | ydb-a8900aae714224b850fcd69326ba16381ec39b32.tar.gz |
Improve TestShard KIKIMR-11082
-rw-r--r-- | ydb/core/protos/msgbus.proto | 2 | ||||
-rw-r--r-- | ydb/core/test_tablet/load_actor_impl.cpp | 52 | ||||
-rw-r--r-- | ydb/core/test_tablet/load_actor_impl.h | 9 |
3 files changed, 54 insertions, 9 deletions
diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto index 0eb42e61cd..41755259e1 100644 --- a/ydb/core/protos/msgbus.proto +++ b/ydb/core/protos/msgbus.proto @@ -760,6 +760,8 @@ message TTestShardControlRequest { optional uint64 MinDataBytes = 4; // then random keys are collected until total length drops below MinDataBytes optional uint32 MaxInFlight = 5; optional uint64 ValidateAfterBytes = 9; // number of bytes to write before starting automatic validation + optional uint32 MaxReadsInFlight = 10; + optional bool ResetWritePeriodOnFull = 11; // restart write periods when inflight is full repeated TSizeInterval Sizes = 6; // distrubution of generated value size repeated TTimeInterval WritePeriods = 7; // time between two events repeated TTimeInterval RestartPeriods = 8; // time between automatic restarts diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp index a87c132eb0..d7246cd16f 100644 --- a/ydb/core/test_tablet/load_actor_impl.cpp +++ b/ydb/core/test_tablet/load_actor_impl.cpp @@ -19,6 +19,7 @@ namespace NKikimr::NTestShard { Send(MakeStateServerInterfaceActorId(), new TEvStateServerConnect(Settings.GetStorageServerHost(), Settings.GetStorageServerPort())); Send(parentId, new TTestShard::TEvSwitchMode(TTestShard::EMode::STATE_SERVER_CONNECT)); + NextWriteTimestamp = TActivationContext::Monotonic(); Become(&TThis::StateFunc); } @@ -55,21 +56,53 @@ namespace NKikimr::NTestShard { RunValidation(false); } } else { // resume load - if (WritesInFlight.size() < Settings.GetMaxInFlight()) { // write until there is space in inflight - IssueWrite(); - if (WritesInFlight.size() < Settings.GetMaxInFlight()) { - TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0)); + const TMonotonic now = TActivationContext::Monotonic(); + + bool canWriteMore = false; + if (WritesInFlight.size() < Settings.GetMaxInFlight()) { + if (NextWriteTimestamp <= now) { + IssueWrite(); + if (WritesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) { + NextWriteTimestamp += GenerateRandomInterval(Settings.GetWritePeriods()); + canWriteMore = NextWriteTimestamp <= now; + } else { + NextWriteTimestamp = TMonotonic::Max(); + } + } else if (!WriteOnTimeScheduled) { + Y_VERIFY(NextWriteTimestamp != TMonotonic::Max()); + TActivationContext::Schedule(NextWriteTimestamp, new IEventHandle(EvWriteOnTime, 0, SelfId(), {}, nullptr, 0)); + WriteOnTimeScheduled = true; } } - if (ReadsInFlight.size() < 10 && IssueRead()) { - TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0)); + + bool canReadMore = false; + if (ReadsInFlight.size() < Settings.GetMaxReadsInFlight()) { + canReadMore = IssueRead(); } + if (BytesOfData > Settings.GetMaxDataBytes()) { // delete some data if needed IssueDelete(); } + + if (!DoSomeActionInFlight && (canWriteMore || canReadMore)) { + TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0)); + DoSomeActionInFlight = true; + } } } + void TLoadActor::HandleDoSomeAction() { + Y_VERIFY(DoSomeActionInFlight); + DoSomeActionInFlight = false; + Action(); + } + + void TLoadActor::HandleWriteOnTime() { + Y_VERIFY(WriteOnTimeScheduled); + WriteOnTimeScheduled = false; + Action(); + } + void TLoadActor::Handle(TEvStateServerStatus::TPtr ev) { if (ev->Get()->Connected) { RunValidation(true); @@ -83,7 +116,7 @@ namespace NKikimr::NTestShard { Y_VERIFY(interval.HasFrequency() && interval.HasMaxIntervalMs()); const double frequency = interval.GetFrequency(); const double xMin = exp(-frequency * interval.GetMaxIntervalMs() * 1e-3); - const double x = Max(xMin, TAppData::RandomProvider->GenRandReal2()); + const double x = Max(xMin, RandomNumber<double>()); return TDuration::Seconds(-log(x) / frequency); } @@ -99,7 +132,7 @@ namespace NKikimr::NTestShard { const auto& interval = intervals[PickInterval(intervals)]; Y_VERIFY(interval.HasMin() && interval.HasMax() && interval.GetMin() <= interval.GetMax()); *isInline = interval.GetInline(); - return TAppData::RandomProvider->Uniform(interval.GetMin(), interval.GetMax()); + return interval.GetMin() + RandomNumber<size_t>(interval.GetMax() - interval.GetMin() + 1); } std::unique_ptr<TEvKeyValue::TEvRequest> TLoadActor::CreateRequest() { @@ -161,6 +194,9 @@ namespace NKikimr::NTestShard { ProcessDeleteResult(record.GetCookie(), record.GetDeleteRangeResult()); ProcessReadResult(record.GetCookie(), record.GetReadResult()); } + if (WritesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) { + NextWriteTimestamp = TMonotonic::Now() + GenerateRandomInterval(Settings.GetWritePeriods()); + } Action(); } diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h index 47c46901d6..75691df1ae 100644 --- a/ydb/core/test_tablet/load_actor_impl.h +++ b/ydb/core/test_tablet/load_actor_impl.h @@ -31,6 +31,7 @@ namespace NKikimr::NTestShard { enum { EvValidationFinished = EventSpaceBegin(TEvents::ES_PRIVATE), EvDoSomeAction, + EvWriteOnTime, }; struct TEvValidationFinished : TEventLocal<TEvValidationFinished, EvValidationFinished> { @@ -62,7 +63,8 @@ namespace NKikimr::NTestShard { hFunc(TEvStateServerStatus, Handle); hFunc(TEvStateServerWriteResult, Handle); hFunc(TEvValidationFinished, Handle); - cFunc(EvDoSomeAction, Action); + cFunc(EvDoSomeAction, HandleDoSomeAction); + cFunc(EvWriteOnTime, HandleWriteOnTime); cFunc(TEvents::TSystem::Poison, PassAway); cFunc(TEvents::TSystem::Wakeup, HandleWakeup); ) @@ -117,11 +119,16 @@ namespace NKikimr::NTestShard { TTimeSeries StateServerWriteLatency; TTimeSeries WriteLatency; TTimeSeries ReadLatency; + TMonotonic NextWriteTimestamp; + bool WriteOnTimeScheduled = false; + bool DoSomeActionInFlight = false; void GenerateKeyValue(TString *key, TString *value, bool *isInline); void IssueWrite(); void ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results); void TrimBytesWritten(TInstant now); + void HandleWriteOnTime(); + void HandleDoSomeAction(); std::unordered_map<ui64, std::tuple<TString, ui32, ui32, TMonotonic>> ReadsInFlight; std::unordered_map<TString, ui32> KeysBeingRead; |