diff options
author | tarum <tarum@yandex-team.com> | 2023-04-17 16:27:57 +0300 |
---|---|---|
committer | tarum <tarum@yandex-team.com> | 2023-04-17 16:27:57 +0300 |
commit | bd6d11bf605a9ccfbbcd6a4ed5b9a1ee5ab18c03 (patch) | |
tree | de114113ae86f1d0bc8b7cc7e1be6ca671f2cfae | |
parent | 37c06c5b32df138ddbb62bd322e57ddc3d7e27e1 (diff) | |
download | ydb-bd6d11bf605a9ccfbbcd6a4ed5b9a1ee5ab18c03.tar.gz |
Improve consistency in how finish is reported across load actors, fix few smaller issues
-rw-r--r-- | ydb/core/load_test/events.h | 4 | ||||
-rw-r--r-- | ydb/core/load_test/group_write.cpp | 6 | ||||
-rw-r--r-- | ydb/core/load_test/keyvalue_write.cpp | 14 | ||||
-rw-r--r-- | ydb/core/load_test/kqp.cpp | 33 | ||||
-rw-r--r-- | ydb/core/load_test/memory.cpp | 20 | ||||
-rw-r--r-- | ydb/core/load_test/pdisk_log.cpp | 4 | ||||
-rw-r--r-- | ydb/core/load_test/service_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/load_test/vdisk_write.cpp | 2 |
8 files changed, 58 insertions, 27 deletions
diff --git a/ydb/core/load_test/events.h b/ydb/core/load_test/events.h index 7ec00e9588..d698f8b244 100644 --- a/ydb/core/load_test/events.h +++ b/ydb/core/load_test/events.h @@ -76,8 +76,8 @@ struct TEvLoad { struct TEvLoadTestFinished : public TEventLocal<TEvLoadTestFinished, TEvLoad::EvLoadTestFinished> { ui64 Tag; - TIntrusivePtr<TLoadReport> Report; // nullptr indicates error - TString ErrorReason; + TIntrusivePtr<TLoadReport> Report; // nullptr indicates an error or an early stop + TString ErrorReason; // human readable status, might be nonempty even in the case of success TString LastHtmlPage; NJson::TJsonValue JsonResult; diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp index acadc60203..56b30b9c59 100644 --- a/ydb/core/load_test/group_write.cpp +++ b/ydb/core/load_test/group_write.cpp @@ -752,7 +752,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo ::NMonitoring::TDynamicCounters::TCounterPtr ScheduleCounter; - ui32 TestStoppedRecieved = 0; + ui32 TestStoppedReceived = 0; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -851,8 +851,8 @@ public: } void HandleStopTest(const TActorContext& ctx) { - ++TestStoppedRecieved; - if (TestStoppedRecieved == TabletWriters.size()) { + ++TestStoppedReceived; + if (TestStoppedReceived == TabletWriters.size()) { ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, "HandleStopTest")); Die(ctx); } diff --git a/ydb/core/load_test/keyvalue_write.cpp b/ydb/core/load_test/keyvalue_write.cpp index 2a1eff346b..b23eff366d 100644 --- a/ydb/core/load_test/keyvalue_write.cpp +++ b/ydb/core/load_test/keyvalue_write.cpp @@ -124,6 +124,7 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo // Monitoring TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters; TInstant TestStartTime; + bool EarlyStop = false; ui64 TabletId; TActorId Pipe; @@ -195,6 +196,7 @@ public: LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " last TEvKeyValueResult, " << " all workers is initialized, start test"); } + EarlyStop = false; Connect(ctx); } @@ -203,6 +205,7 @@ public: //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void HandlePoisonPill(const TActorContext& ctx) { + EarlyStop = (TAppData::TimeProvider->Now() - TestStartTime).Seconds() < DurationSeconds; if (OwnerInitInProgress) { LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, " << "not all workers is initialized, so wait them to end initialization"); @@ -217,9 +220,14 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " TKeyValueWriterLoadTestActor StartDeathProcess called"); Become(&TKeyValueWriterLoadTestActor::StateEndOfWork); - TIntrusivePtr<TEvLoad::TLoadReport> Report(new TEvLoad::TLoadReport()); - Report->Duration = TDuration::Seconds(DurationSeconds); - ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK called StartDeathProcess")); + TIntrusivePtr<TEvLoad::TLoadReport> report = nullptr; + if (!EarlyStop) { + report.Reset(new TEvLoad::TLoadReport()); + report->Duration = TDuration::Seconds(DurationSeconds); + } + const TString errorReason = EarlyStop ? + "Abort, stop signal received" : "OK, called StartDeathProcess"; + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason)); NTabletPipe::CloseClient(SelfId(), Pipe); Die(ctx); } diff --git a/ydb/core/load_test/kqp.cpp b/ydb/core/load_test/kqp.cpp index 976ff1de3f..a87093e900 100644 --- a/ydb/core/load_test/kqp.cpp +++ b/ydb/core/load_test/kqp.cpp @@ -328,7 +328,7 @@ public: } LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill"); - // TODO: report error in case of such death + EarlyStop = false; ctx.Schedule(TDuration::Seconds(DurationSeconds + 10), new TEvents::TEvPoisonPill); CreateSessionForTablesDDL(ctx); @@ -336,12 +336,12 @@ public: void HandleWakeup(const TActorContext& ctx) { if (ResultsReceived) { - // if death process is started, then brake wakeup circuit + // if death process is started, then break wakeup circuit return; } size_t targetSessions; if (IncreaseSessions) { - targetSessions = 1 + NumOfSessions * (TInstant::Now() - Start).Seconds() / DurationSeconds; + targetSessions = 1 + NumOfSessions * (TAppData::TimeProvider->Now() - TestStartTime).Seconds() / DurationSeconds; targetSessions = std::min(targetSessions, NumOfSessions); } else { targetSessions = NumOfSessions; @@ -376,6 +376,7 @@ private: // death void HandlePoisonPill(const TActorContext& ctx) { + EarlyStop = (TAppData::TimeProvider->Now() - TestStartTime).Seconds() < DurationSeconds; LOG_CRIT_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, " << "but it is supposed to pass away by receiving TEvKqpWorkerResponse from all of the workers"); StartDeathProcess(ctx); @@ -422,10 +423,19 @@ private: void DeathReport(const TActorContext& ctx) { CloseSession(ctx); - TIntrusivePtr<TEvLoad::TLoadReport> Report(new TEvLoad::TLoadReport()); - Report->Duration = TDuration::Seconds(DurationSeconds); + TIntrusivePtr<TEvLoad::TLoadReport> report = nullptr; + TString errorReason; + if (ResultsReceived >= Workers.size()) { + report.Reset(new TEvLoad::TLoadReport()); + report->Duration = TDuration::Seconds(DurationSeconds); + errorReason = "OK, called StartDeathProcess"; + } else if (EarlyStop) { + errorReason = "Abort, stop signal received"; + } else { + errorReason = "Abort, timeout"; + } - auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK, called StartDeathProcess"); + auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason); finishEv->LastHtmlPage = RenderHTML(); finishEv->JsonResult = GetJsonResult(); ctx.Send(Parent, finishEv); @@ -544,7 +554,7 @@ private: if (InitData.empty()) { LOG_NOTICE_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " initial query is executed, going to create workers"); - Start = TInstant::Now(); + TestStartTime = TAppData::TimeProvider->Now(); if (IncreaseSessions) { ctx.Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); } else { @@ -592,8 +602,8 @@ private: TABLEBODY() { TABLER() { TABLED() { - if (Start) { - str << (TInstant::Now() - Start).Seconds() << " / " << DurationSeconds; + if (TestStartTime) { + str << (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << " / " << DurationSeconds; } else { str << -1 << " / " << DurationSeconds; } @@ -629,7 +639,7 @@ private: WorkloadType, Tag, Workers.size(), - Start + TDuration::Seconds(DurationSeconds), + TestStartTime + TDuration::Seconds(DurationSeconds), Transactions, TransactionsBytesWritten); Workers.push_back(ctx.Register(worker)); @@ -646,7 +656,8 @@ private: ctx.Send(kqp_proxy, ev.Release()); } - TInstant Start; + TInstant TestStartTime; + bool EarlyStop = false; TString TableSession = "wrong sessionId"; TString WorkingDir; ui64 WorkloadType; diff --git a/ydb/core/load_test/memory.cpp b/ydb/core/load_test/memory.cpp index 0346f5952f..a9ecad4ce6 100644 --- a/ydb/core/load_test/memory.cpp +++ b/ydb/core/load_test/memory.cpp @@ -18,10 +18,12 @@ class TMemoryLoadTestActor : public TActorBootstrapped<TMemoryLoadTestActor> { const ui64 Tag; TDuration Duration; + ui32 DurationSeconds; ui64 BlockSize; TDuration Interval; TInstant TestStartTime; + bool EarlyStop = false; TVector<TVector<char>> Blocks; ui64 AllocatedSize = 0; @@ -40,6 +42,7 @@ public: VERIFY_PARAM(DurationSeconds); Duration = TDuration::Seconds(cmd.GetDurationSeconds()); + DurationSeconds = cmd.GetDurationSeconds(); VERIFY_PARAM(BlockSize); BlockSize = cmd.GetBlockSize(); @@ -62,15 +65,22 @@ public: ctx.Schedule(Duration, new TEvents::TEvPoisonPill); ctx.Schedule(Interval, new TEvAllocateBlock); TestStartTime = TAppData::TimeProvider->Now(); + EarlyStop = false; } void HandlePoisonPill(const TActorContext& ctx) { + EarlyStop = (TAppData::TimeProvider->Now() - TestStartTime).Seconds() < DurationSeconds; LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " Handle PoisonPill"); - TIntrusivePtr<TEvLoad::TLoadReport> report(new TEvLoad::TLoadReport()); - report->Duration = Duration; - ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, "OK")); + TIntrusivePtr<TEvLoad::TLoadReport> report = nullptr; + if (!EarlyStop) { + report.Reset(new TEvLoad::TLoadReport()); + report->Duration = Duration; + } + const TString errorReason = EarlyStop ? + "Abort, stop signal received" : "OK, called StartDeathProcess"; + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason)); Die(ctx); } @@ -108,8 +118,7 @@ public: } TABLEBODY() { PARAM("Elapsed time / Duration", - (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << "s / " - << Duration.Seconds() << "s"); + (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << "s / " << DurationSeconds << "s"); PARAM("Interval", Interval.MicroSeconds() << "us"); PARAM("Block size", BlockSize); PARAM("Allocated bytes", AllocatedSize); @@ -117,6 +126,7 @@ public: } } } +#undef PARAM ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId)); } diff --git a/ydb/core/load_test/pdisk_log.cpp b/ydb/core/load_test/pdisk_log.cpp index 97fd9e0b58..b3fc489e58 100644 --- a/ydb/core/load_test/pdisk_log.cpp +++ b/ydb/core/load_test/pdisk_log.cpp @@ -563,10 +563,10 @@ public: << " GetReallyWrittenBytes()# " << worker->GetReallyWrittenBytes() << " GetGlobalWrittenBytes()# " << worker->GetGlobalWrittenBytes()); } - auto report = std::make_unique<TEvLoad::TLoadReport>(); + TIntrusivePtr<TEvLoad::TLoadReport> report = new TEvLoad::TLoadReport(); report->LoadType = TEvLoad::TLoadReport::LOAD_LOG_WRITE; report->Duration = TAppData::TimeProvider->Now() - TestStartTime; - ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report.release(), "OK")); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, "OK")); LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " End of work, TEvLoadTestFinished is sent"); Die(ctx); } diff --git a/ydb/core/load_test/service_actor.cpp b/ydb/core/load_test/service_actor.cpp index a7fe093c00..d0cd2082de 100644 --- a/ydb/core/load_test/service_actor.cpp +++ b/ydb/core/load_test/service_actor.cpp @@ -605,7 +605,7 @@ public: const TString& uuid = UuidByTag.at(msg->Tag); record.SetUuid(uuid); record.SetNodeId(SelfId().NodeId()); - record.SetSuccess(msg->Report != nullptr); // TODO check how load actors set the field + record.SetSuccess(msg->Report != nullptr); record.SetFinishTimestamp(finishTime.Seconds()); record.SetErrorReason(msg->ErrorReason); diff --git a/ydb/core/load_test/vdisk_write.cpp b/ydb/core/load_test/vdisk_write.cpp index c5da30e855..7fc29f41db 100644 --- a/ydb/core/load_test/vdisk_write.cpp +++ b/ydb/core/load_test/vdisk_write.cpp @@ -326,6 +326,8 @@ namespace NKikimr { } } } +#undef NAMED_PARAM +#undef PARAM ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId)); } |