aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-07-17 15:37:34 +0300
committeralexvru <alexvru@ydb.tech>2023-07-17 15:37:34 +0300
commita8900aae714224b850fcd69326ba16381ec39b32 (patch)
tree715b5e757806c8c0038847c339c65a457a27d018
parent014ff1fcc2d92fee3ff2096f203a4bc561f79b83 (diff)
downloadydb-a8900aae714224b850fcd69326ba16381ec39b32.tar.gz
Improve TestShard KIKIMR-11082
-rw-r--r--ydb/core/protos/msgbus.proto2
-rw-r--r--ydb/core/test_tablet/load_actor_impl.cpp52
-rw-r--r--ydb/core/test_tablet/load_actor_impl.h9
3 files changed, 54 insertions, 9 deletions
diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto
index 0eb42e61cd..41755259e1 100644
--- a/ydb/core/protos/msgbus.proto
+++ b/ydb/core/protos/msgbus.proto
@@ -760,6 +760,8 @@ message TTestShardControlRequest {
optional uint64 MinDataBytes = 4; // then random keys are collected until total length drops below MinDataBytes
optional uint32 MaxInFlight = 5;
optional uint64 ValidateAfterBytes = 9; // number of bytes to write before starting automatic validation
+ optional uint32 MaxReadsInFlight = 10;
+ optional bool ResetWritePeriodOnFull = 11; // restart write periods when inflight is full
repeated TSizeInterval Sizes = 6; // distrubution of generated value size
repeated TTimeInterval WritePeriods = 7; // time between two events
repeated TTimeInterval RestartPeriods = 8; // time between automatic restarts
diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp
index a87c132eb0..d7246cd16f 100644
--- a/ydb/core/test_tablet/load_actor_impl.cpp
+++ b/ydb/core/test_tablet/load_actor_impl.cpp
@@ -19,6 +19,7 @@ namespace NKikimr::NTestShard {
Send(MakeStateServerInterfaceActorId(), new TEvStateServerConnect(Settings.GetStorageServerHost(),
Settings.GetStorageServerPort()));
Send(parentId, new TTestShard::TEvSwitchMode(TTestShard::EMode::STATE_SERVER_CONNECT));
+ NextWriteTimestamp = TActivationContext::Monotonic();
Become(&TThis::StateFunc);
}
@@ -55,21 +56,53 @@ namespace NKikimr::NTestShard {
RunValidation(false);
}
} else { // resume load
- if (WritesInFlight.size() < Settings.GetMaxInFlight()) { // write until there is space in inflight
- IssueWrite();
- if (WritesInFlight.size() < Settings.GetMaxInFlight()) {
- TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0));
+ const TMonotonic now = TActivationContext::Monotonic();
+
+ bool canWriteMore = false;
+ if (WritesInFlight.size() < Settings.GetMaxInFlight()) {
+ if (NextWriteTimestamp <= now) {
+ IssueWrite();
+ if (WritesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) {
+ NextWriteTimestamp += GenerateRandomInterval(Settings.GetWritePeriods());
+ canWriteMore = NextWriteTimestamp <= now;
+ } else {
+ NextWriteTimestamp = TMonotonic::Max();
+ }
+ } else if (!WriteOnTimeScheduled) {
+ Y_VERIFY(NextWriteTimestamp != TMonotonic::Max());
+ TActivationContext::Schedule(NextWriteTimestamp, new IEventHandle(EvWriteOnTime, 0, SelfId(), {}, nullptr, 0));
+ WriteOnTimeScheduled = true;
}
}
- if (ReadsInFlight.size() < 10 && IssueRead()) {
- TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0));
+
+ bool canReadMore = false;
+ if (ReadsInFlight.size() < Settings.GetMaxReadsInFlight()) {
+ canReadMore = IssueRead();
}
+
if (BytesOfData > Settings.GetMaxDataBytes()) { // delete some data if needed
IssueDelete();
}
+
+ if (!DoSomeActionInFlight && (canWriteMore || canReadMore)) {
+ TActivationContext::Send(new IEventHandle(EvDoSomeAction, 0, SelfId(), {}, nullptr, 0));
+ DoSomeActionInFlight = true;
+ }
}
}
+ void TLoadActor::HandleDoSomeAction() {
+ Y_VERIFY(DoSomeActionInFlight);
+ DoSomeActionInFlight = false;
+ Action();
+ }
+
+ void TLoadActor::HandleWriteOnTime() {
+ Y_VERIFY(WriteOnTimeScheduled);
+ WriteOnTimeScheduled = false;
+ Action();
+ }
+
void TLoadActor::Handle(TEvStateServerStatus::TPtr ev) {
if (ev->Get()->Connected) {
RunValidation(true);
@@ -83,7 +116,7 @@ namespace NKikimr::NTestShard {
Y_VERIFY(interval.HasFrequency() && interval.HasMaxIntervalMs());
const double frequency = interval.GetFrequency();
const double xMin = exp(-frequency * interval.GetMaxIntervalMs() * 1e-3);
- const double x = Max(xMin, TAppData::RandomProvider->GenRandReal2());
+ const double x = Max(xMin, RandomNumber<double>());
return TDuration::Seconds(-log(x) / frequency);
}
@@ -99,7 +132,7 @@ namespace NKikimr::NTestShard {
const auto& interval = intervals[PickInterval(intervals)];
Y_VERIFY(interval.HasMin() && interval.HasMax() && interval.GetMin() <= interval.GetMax());
*isInline = interval.GetInline();
- return TAppData::RandomProvider->Uniform(interval.GetMin(), interval.GetMax());
+ return interval.GetMin() + RandomNumber<size_t>(interval.GetMax() - interval.GetMin() + 1);
}
std::unique_ptr<TEvKeyValue::TEvRequest> TLoadActor::CreateRequest() {
@@ -161,6 +194,9 @@ namespace NKikimr::NTestShard {
ProcessDeleteResult(record.GetCookie(), record.GetDeleteRangeResult());
ProcessReadResult(record.GetCookie(), record.GetReadResult());
}
+ if (WritesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) {
+ NextWriteTimestamp = TMonotonic::Now() + GenerateRandomInterval(Settings.GetWritePeriods());
+ }
Action();
}
diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h
index 47c46901d6..75691df1ae 100644
--- a/ydb/core/test_tablet/load_actor_impl.h
+++ b/ydb/core/test_tablet/load_actor_impl.h
@@ -31,6 +31,7 @@ namespace NKikimr::NTestShard {
enum {
EvValidationFinished = EventSpaceBegin(TEvents::ES_PRIVATE),
EvDoSomeAction,
+ EvWriteOnTime,
};
struct TEvValidationFinished : TEventLocal<TEvValidationFinished, EvValidationFinished> {
@@ -62,7 +63,8 @@ namespace NKikimr::NTestShard {
hFunc(TEvStateServerStatus, Handle);
hFunc(TEvStateServerWriteResult, Handle);
hFunc(TEvValidationFinished, Handle);
- cFunc(EvDoSomeAction, Action);
+ cFunc(EvDoSomeAction, HandleDoSomeAction);
+ cFunc(EvWriteOnTime, HandleWriteOnTime);
cFunc(TEvents::TSystem::Poison, PassAway);
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
)
@@ -117,11 +119,16 @@ namespace NKikimr::NTestShard {
TTimeSeries StateServerWriteLatency;
TTimeSeries WriteLatency;
TTimeSeries ReadLatency;
+ TMonotonic NextWriteTimestamp;
+ bool WriteOnTimeScheduled = false;
+ bool DoSomeActionInFlight = false;
void GenerateKeyValue(TString *key, TString *value, bool *isInline);
void IssueWrite();
void ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results);
void TrimBytesWritten(TInstant now);
+ void HandleWriteOnTime();
+ void HandleDoSomeAction();
std::unordered_map<ui64, std::tuple<TString, ui32, ui32, TMonotonic>> ReadsInFlight;
std::unordered_map<TString, ui32> KeysBeingRead;