aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortarum <tarum@yandex-team.com>2023-04-17 16:27:57 +0300
committertarum <tarum@yandex-team.com>2023-04-17 16:27:57 +0300
commitbd6d11bf605a9ccfbbcd6a4ed5b9a1ee5ab18c03 (patch)
treede114113ae86f1d0bc8b7cc7e1be6ca671f2cfae
parent37c06c5b32df138ddbb62bd322e57ddc3d7e27e1 (diff)
downloadydb-bd6d11bf605a9ccfbbcd6a4ed5b9a1ee5ab18c03.tar.gz
Improve consistency in how finish is reported across load actors, fix few smaller issues
-rw-r--r--ydb/core/load_test/events.h4
-rw-r--r--ydb/core/load_test/group_write.cpp6
-rw-r--r--ydb/core/load_test/keyvalue_write.cpp14
-rw-r--r--ydb/core/load_test/kqp.cpp33
-rw-r--r--ydb/core/load_test/memory.cpp20
-rw-r--r--ydb/core/load_test/pdisk_log.cpp4
-rw-r--r--ydb/core/load_test/service_actor.cpp2
-rw-r--r--ydb/core/load_test/vdisk_write.cpp2
8 files changed, 58 insertions, 27 deletions
diff --git a/ydb/core/load_test/events.h b/ydb/core/load_test/events.h
index 7ec00e9588..d698f8b244 100644
--- a/ydb/core/load_test/events.h
+++ b/ydb/core/load_test/events.h
@@ -76,8 +76,8 @@ struct TEvLoad {
struct TEvLoadTestFinished : public TEventLocal<TEvLoadTestFinished, TEvLoad::EvLoadTestFinished> {
ui64 Tag;
- TIntrusivePtr<TLoadReport> Report; // nullptr indicates error
- TString ErrorReason;
+ TIntrusivePtr<TLoadReport> Report; // nullptr indicates an error or an early stop
+ TString ErrorReason; // human readable status, might be nonempty even in the case of success
TString LastHtmlPage;
NJson::TJsonValue JsonResult;
diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp
index acadc60203..56b30b9c59 100644
--- a/ydb/core/load_test/group_write.cpp
+++ b/ydb/core/load_test/group_write.cpp
@@ -752,7 +752,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
::NMonitoring::TDynamicCounters::TCounterPtr ScheduleCounter;
- ui32 TestStoppedRecieved = 0;
+ ui32 TestStoppedReceived = 0;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -851,8 +851,8 @@ public:
}
void HandleStopTest(const TActorContext& ctx) {
- ++TestStoppedRecieved;
- if (TestStoppedRecieved == TabletWriters.size()) {
+ ++TestStoppedReceived;
+ if (TestStoppedReceived == TabletWriters.size()) {
ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, "HandleStopTest"));
Die(ctx);
}
diff --git a/ydb/core/load_test/keyvalue_write.cpp b/ydb/core/load_test/keyvalue_write.cpp
index 2a1eff346b..b23eff366d 100644
--- a/ydb/core/load_test/keyvalue_write.cpp
+++ b/ydb/core/load_test/keyvalue_write.cpp
@@ -124,6 +124,7 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
// Monitoring
TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters;
TInstant TestStartTime;
+ bool EarlyStop = false;
ui64 TabletId;
TActorId Pipe;
@@ -195,6 +196,7 @@ public:
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " last TEvKeyValueResult, "
<< " all workers is initialized, start test");
}
+ EarlyStop = false;
Connect(ctx);
}
@@ -203,6 +205,7 @@ public:
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void HandlePoisonPill(const TActorContext& ctx) {
+ EarlyStop = (TAppData::TimeProvider->Now() - TestStartTime).Seconds() < DurationSeconds;
if (OwnerInitInProgress) {
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, "
<< "not all workers is initialized, so wait them to end initialization");
@@ -217,9 +220,14 @@ public:
LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag
<< " TKeyValueWriterLoadTestActor StartDeathProcess called");
Become(&TKeyValueWriterLoadTestActor::StateEndOfWork);
- TIntrusivePtr<TEvLoad::TLoadReport> Report(new TEvLoad::TLoadReport());
- Report->Duration = TDuration::Seconds(DurationSeconds);
- ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK called StartDeathProcess"));
+ TIntrusivePtr<TEvLoad::TLoadReport> report = nullptr;
+ if (!EarlyStop) {
+ report.Reset(new TEvLoad::TLoadReport());
+ report->Duration = TDuration::Seconds(DurationSeconds);
+ }
+ const TString errorReason = EarlyStop ?
+ "Abort, stop signal received" : "OK, called StartDeathProcess";
+ ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason));
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
}
diff --git a/ydb/core/load_test/kqp.cpp b/ydb/core/load_test/kqp.cpp
index 976ff1de3f..a87093e900 100644
--- a/ydb/core/load_test/kqp.cpp
+++ b/ydb/core/load_test/kqp.cpp
@@ -328,7 +328,7 @@ public:
}
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill");
- // TODO: report error in case of such death
+ EarlyStop = false;
ctx.Schedule(TDuration::Seconds(DurationSeconds + 10), new TEvents::TEvPoisonPill);
CreateSessionForTablesDDL(ctx);
@@ -336,12 +336,12 @@ public:
void HandleWakeup(const TActorContext& ctx) {
if (ResultsReceived) {
- // if death process is started, then brake wakeup circuit
+ // if death process is started, then break wakeup circuit
return;
}
size_t targetSessions;
if (IncreaseSessions) {
- targetSessions = 1 + NumOfSessions * (TInstant::Now() - Start).Seconds() / DurationSeconds;
+ targetSessions = 1 + NumOfSessions * (TAppData::TimeProvider->Now() - TestStartTime).Seconds() / DurationSeconds;
targetSessions = std::min(targetSessions, NumOfSessions);
} else {
targetSessions = NumOfSessions;
@@ -376,6 +376,7 @@ private:
// death
void HandlePoisonPill(const TActorContext& ctx) {
+ EarlyStop = (TAppData::TimeProvider->Now() - TestStartTime).Seconds() < DurationSeconds;
LOG_CRIT_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, "
<< "but it is supposed to pass away by receiving TEvKqpWorkerResponse from all of the workers");
StartDeathProcess(ctx);
@@ -422,10 +423,19 @@ private:
void DeathReport(const TActorContext& ctx) {
CloseSession(ctx);
- TIntrusivePtr<TEvLoad::TLoadReport> Report(new TEvLoad::TLoadReport());
- Report->Duration = TDuration::Seconds(DurationSeconds);
+ TIntrusivePtr<TEvLoad::TLoadReport> report = nullptr;
+ TString errorReason;
+ if (ResultsReceived >= Workers.size()) {
+ report.Reset(new TEvLoad::TLoadReport());
+ report->Duration = TDuration::Seconds(DurationSeconds);
+ errorReason = "OK, called StartDeathProcess";
+ } else if (EarlyStop) {
+ errorReason = "Abort, stop signal received";
+ } else {
+ errorReason = "Abort, timeout";
+ }
- auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK, called StartDeathProcess");
+ auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason);
finishEv->LastHtmlPage = RenderHTML();
finishEv->JsonResult = GetJsonResult();
ctx.Send(Parent, finishEv);
@@ -544,7 +554,7 @@ private:
if (InitData.empty()) {
LOG_NOTICE_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " initial query is executed, going to create workers");
- Start = TInstant::Now();
+ TestStartTime = TAppData::TimeProvider->Now();
if (IncreaseSessions) {
ctx.Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
} else {
@@ -592,8 +602,8 @@ private:
TABLEBODY() {
TABLER() {
TABLED() {
- if (Start) {
- str << (TInstant::Now() - Start).Seconds() << " / " << DurationSeconds;
+ if (TestStartTime) {
+ str << (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << " / " << DurationSeconds;
} else {
str << -1 << " / " << DurationSeconds;
}
@@ -629,7 +639,7 @@ private:
WorkloadType,
Tag,
Workers.size(),
- Start + TDuration::Seconds(DurationSeconds),
+ TestStartTime + TDuration::Seconds(DurationSeconds),
Transactions,
TransactionsBytesWritten);
Workers.push_back(ctx.Register(worker));
@@ -646,7 +656,8 @@ private:
ctx.Send(kqp_proxy, ev.Release());
}
- TInstant Start;
+ TInstant TestStartTime;
+ bool EarlyStop = false;
TString TableSession = "wrong sessionId";
TString WorkingDir;
ui64 WorkloadType;
diff --git a/ydb/core/load_test/memory.cpp b/ydb/core/load_test/memory.cpp
index 0346f5952f..a9ecad4ce6 100644
--- a/ydb/core/load_test/memory.cpp
+++ b/ydb/core/load_test/memory.cpp
@@ -18,10 +18,12 @@ class TMemoryLoadTestActor : public TActorBootstrapped<TMemoryLoadTestActor> {
const ui64 Tag;
TDuration Duration;
+ ui32 DurationSeconds;
ui64 BlockSize;
TDuration Interval;
TInstant TestStartTime;
+ bool EarlyStop = false;
TVector<TVector<char>> Blocks;
ui64 AllocatedSize = 0;
@@ -40,6 +42,7 @@ public:
VERIFY_PARAM(DurationSeconds);
Duration = TDuration::Seconds(cmd.GetDurationSeconds());
+ DurationSeconds = cmd.GetDurationSeconds();
VERIFY_PARAM(BlockSize);
BlockSize = cmd.GetBlockSize();
@@ -62,15 +65,22 @@ public:
ctx.Schedule(Duration, new TEvents::TEvPoisonPill);
ctx.Schedule(Interval, new TEvAllocateBlock);
TestStartTime = TAppData::TimeProvider->Now();
+ EarlyStop = false;
}
void HandlePoisonPill(const TActorContext& ctx) {
+ EarlyStop = (TAppData::TimeProvider->Now() - TestStartTime).Seconds() < DurationSeconds;
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag
<< " Handle PoisonPill");
- TIntrusivePtr<TEvLoad::TLoadReport> report(new TEvLoad::TLoadReport());
- report->Duration = Duration;
- ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, "OK"));
+ TIntrusivePtr<TEvLoad::TLoadReport> report = nullptr;
+ if (!EarlyStop) {
+ report.Reset(new TEvLoad::TLoadReport());
+ report->Duration = Duration;
+ }
+ const TString errorReason = EarlyStop ?
+ "Abort, stop signal received" : "OK, called StartDeathProcess";
+ ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason));
Die(ctx);
}
@@ -108,8 +118,7 @@ public:
}
TABLEBODY() {
PARAM("Elapsed time / Duration",
- (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << "s / "
- << Duration.Seconds() << "s");
+ (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << "s / " << DurationSeconds << "s");
PARAM("Interval", Interval.MicroSeconds() << "us");
PARAM("Block size", BlockSize);
PARAM("Allocated bytes", AllocatedSize);
@@ -117,6 +126,7 @@ public:
}
}
}
+#undef PARAM
ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId));
}
diff --git a/ydb/core/load_test/pdisk_log.cpp b/ydb/core/load_test/pdisk_log.cpp
index 97fd9e0b58..b3fc489e58 100644
--- a/ydb/core/load_test/pdisk_log.cpp
+++ b/ydb/core/load_test/pdisk_log.cpp
@@ -563,10 +563,10 @@ public:
<< " GetReallyWrittenBytes()# " << worker->GetReallyWrittenBytes()
<< " GetGlobalWrittenBytes()# " << worker->GetGlobalWrittenBytes());
}
- auto report = std::make_unique<TEvLoad::TLoadReport>();
+ TIntrusivePtr<TEvLoad::TLoadReport> report = new TEvLoad::TLoadReport();
report->LoadType = TEvLoad::TLoadReport::LOAD_LOG_WRITE;
report->Duration = TAppData::TimeProvider->Now() - TestStartTime;
- ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report.release(), "OK"));
+ ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, "OK"));
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " End of work, TEvLoadTestFinished is sent");
Die(ctx);
}
diff --git a/ydb/core/load_test/service_actor.cpp b/ydb/core/load_test/service_actor.cpp
index a7fe093c00..d0cd2082de 100644
--- a/ydb/core/load_test/service_actor.cpp
+++ b/ydb/core/load_test/service_actor.cpp
@@ -605,7 +605,7 @@ public:
const TString& uuid = UuidByTag.at(msg->Tag);
record.SetUuid(uuid);
record.SetNodeId(SelfId().NodeId());
- record.SetSuccess(msg->Report != nullptr); // TODO check how load actors set the field
+ record.SetSuccess(msg->Report != nullptr);
record.SetFinishTimestamp(finishTime.Seconds());
record.SetErrorReason(msg->ErrorReason);
diff --git a/ydb/core/load_test/vdisk_write.cpp b/ydb/core/load_test/vdisk_write.cpp
index c5da30e855..7fc29f41db 100644
--- a/ydb/core/load_test/vdisk_write.cpp
+++ b/ydb/core/load_test/vdisk_write.cpp
@@ -326,6 +326,8 @@ namespace NKikimr {
}
}
}
+#undef NAMED_PARAM
+#undef PARAM
ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId));
}