diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-14 18:51:29 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-14 18:51:29 +0300 |
commit | d92eea742a025c8c9a888871a38ae01105d4e29b (patch) | |
tree | 7d71db1756a97afe57bf479343421c81ecc3bb23 | |
parent | efc6662d04467bb770d618638a26ab31ae6356b0 (diff) | |
download | ydb-d92eea742a025c8c9a888871a38ae01105d4e29b.tar.gz |
Rename TestLoad -> LoadTest
-rw-r--r-- | ydb/core/base/blobstorage.h | 5 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_load.cpp | 8 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 2 | ||||
-rw-r--r-- | ydb/core/load_test/defs.h | 2 | ||||
-rw-r--r-- | ydb/core/load_test/events.h | 84 | ||||
-rw-r--r-- | ydb/core/load_test/group_write.cpp | 12 | ||||
-rw-r--r-- | ydb/core/load_test/interval_gen.h | 3 | ||||
-rw-r--r-- | ydb/core/load_test/keyvalue_write.cpp | 36 | ||||
-rw-r--r-- | ydb/core/load_test/kqp.cpp | 12 | ||||
-rw-r--r-- | ydb/core/load_test/memory.cpp | 18 | ||||
-rw-r--r-- | ydb/core/load_test/pdisk_log.cpp | 38 | ||||
-rw-r--r-- | ydb/core/load_test/pdisk_read.cpp | 22 | ||||
-rw-r--r-- | ydb/core/load_test/pdisk_write.cpp | 34 | ||||
-rw-r--r-- | ydb/core/load_test/service_actor.cpp | 56 | ||||
-rw-r--r-- | ydb/core/load_test/service_actor.h | 92 | ||||
-rw-r--r-- | ydb/core/load_test/size_gen.h | 2 | ||||
-rw-r--r-- | ydb/core/load_test/vdisk_write.cpp | 6 | ||||
-rw-r--r-- | ydb/core/protos/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/core/protos/load_test.proto (renamed from ydb/core/protos/testload.proto) | 4 | ||||
-rw-r--r-- | ydb/core/protos/msgbus.proto | 4 |
20 files changed, 218 insertions, 224 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index f05dfb429d..e5ed8f25f3 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -850,11 +850,6 @@ struct TEvBlobStorage { EvVMockCtlRequest, EvVMockCtlResponse, - // load actor control - EvTestLoadRequest = EvPut + 16 * 512, - EvTestLoadFinished, - EvTestLoadResponse, - // incremental huge blob keeper EvIncrHugeInit = EvPut + 17 * 512, EvIncrHugeInitResult, diff --git a/ydb/core/client/server/msgbus_server_load.cpp b/ydb/core/client/server/msgbus_server_load.cpp index 3dff97bc88..d40ba1063a 100644 --- a/ydb/core/client/server/msgbus_server_load.cpp +++ b/ydb/core/client/server/msgbus_server_load.cpp @@ -6,7 +6,7 @@ namespace NMsgBusProxy { class TBsTestLoadActorRequest : public TActorBootstrapped<TBsTestLoadActorRequest>, public TMessageBusSessionIdentHolder { TVector<ui32> NodeIds; - NKikimr::TEvTestLoadRequest Cmd; + NKikimr::TEvLoadTestRequest Cmd; NKikimrClient::TBsTestLoadResponse Response; ui32 ResponsesPending; @@ -24,7 +24,7 @@ public: void Bootstrap(const TActorContext& ctx) { for (ui32 nodeId : NodeIds) { - auto msg = MakeHolder<TEvLoad::TEvTestLoadRequest>(); + auto msg = MakeHolder<TEvLoad::TEvLoadTestRequest>(); msg->Record = Cmd; msg->Record.SetCookie(nodeId); ctx.Send(MakeLoadServiceID(nodeId), msg.Release()); @@ -35,7 +35,7 @@ public: CheckResponse(ctx); } - void Handle(TEvLoad::TEvTestLoadResponse::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvLoad::TEvLoadTestResponse::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record; ui32 nodeId = record.GetCookie(); --ResponsesPending; @@ -63,7 +63,7 @@ public: STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { - HFunc(TEvLoad::TEvTestLoadResponse, Handle); + HFunc(TEvLoad::TEvLoadTestResponse, Handle); } } }; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index fbb9ee6c82..23dc30d71a 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1855,7 +1855,7 @@ TLoadInitializer::TLoadInitializer(const TKikimrRunConfig& runConfig) {} void TLoadInitializer::InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) { - IActor *bsActor = CreateTestLoadActor(appData->Counters); + IActor *bsActor = CreateLoadTestActor(appData->Counters); setup->LocalServices.emplace_back(MakeLoadServiceID(NodeId), TActorSetupCmd(bsActor, TMailboxType::HTSwap, appData->UserPoolId)); // FIXME: correct service id diff --git a/ydb/core/load_test/defs.h b/ydb/core/load_test/defs.h index a413c00f7a..1cded7724f 100644 --- a/ydb/core/load_test/defs.h +++ b/ydb/core/load_test/defs.h @@ -5,7 +5,7 @@ #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/log.h> +#include <ydb/core/protos/load_test.pb.h> #include <ydb/core/protos/services.pb.h> -#include <ydb/core/protos/testload.pb.h> #include <ydb/core/load_test/events.h> diff --git a/ydb/core/load_test/events.h b/ydb/core/load_test/events.h index 15a7e0cca7..8b84db914d 100644 --- a/ydb/core/load_test/events.h +++ b/ydb/core/load_test/events.h @@ -1,21 +1,91 @@ #include <ydb/core/base/events.h> -#include <ydb/core/protos/testload.pb.h> +#include <ydb/core/protos/load_test.pb.h> + +#include <library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h> +#include <library/cpp/json/writer/json_value.h> namespace NKikimr { struct TEvLoad { enum EEv { - EvTestLoadRequest = EventSpaceBegin(TKikimrEvents::ES_TEST_LOAD), - EvTestLoadResponse, + EvLoadTestRequest = EventSpaceBegin(TKikimrEvents::ES_TEST_LOAD), + EvLoadTestFinished, + EvLoadTestResponse, }; - struct TEvTestLoadRequest : public TEventPB<TEvTestLoadRequest, - NKikimr::TEvTestLoadRequest, EvTestLoadRequest> + struct TEvLoadTestRequest : public TEventPB<TEvLoadTestRequest, + NKikimr::TEvLoadTestRequest, EvLoadTestRequest> {}; - struct TEvTestLoadResponse : public TEventPB<TEvTestLoadResponse, - NKikimr::TEvTestLoadResponse, EvTestLoadResponse> + struct TEvLoadTestResponse : public TEventPB<TEvLoadTestResponse, + NKikimr::TEvLoadTestResponse, EvLoadTestResponse> {}; + + struct TLoadReport : public TThrRefBase { + enum ELoadType { + LOAD_READ, + LOAD_WRITE, + LOAD_LOG_WRITE, + }; + + TDuration Duration; + ui64 Size; + ui32 InFlight; + TVector<ui64> RwSpeedBps; + ELoadType LoadType; + NMonitoring::TPercentileTrackerLg<10, 4, 1> LatencyUs; // Upper threshold of this tracker is ~134 seconds, size is 256kB + TMap<double, ui64> DeviceLatency; + + double GetAverageSpeed() const { + if (RwSpeedBps.size() < 1) { + return 0; + } + double avg = 0; + for (const ui64& speed : RwSpeedBps) { + avg += speed; + } + avg /= RwSpeedBps.size(); + return avg; + } + + double GetSpeedDeviation() const { + if (RwSpeedBps.size() <= 1) { + return 0; + } + i64 avg = (i64)GetAverageSpeed(); + double sd = 0; + for (const ui64& speed : RwSpeedBps) { + sd += ((i64)speed - avg) * ((i64)speed - avg); + } + sd /= RwSpeedBps.size(); + return std::sqrt(sd); + } + + TString LoadTypeName() const { + switch (LoadType) { + case LOAD_READ: + return "read"; + case LOAD_WRITE: + return "write"; + case LOAD_LOG_WRITE: + return "log_write"; + } + } + }; + + struct TEvLoadTestFinished : public TEventLocal<TEvLoadTestFinished, TEvLoad::EvLoadTestFinished> { + ui64 Tag; + TIntrusivePtr<TLoadReport> Report; // nullptr indicates error + TString ErrorReason; + TString LastHtmlPage; + NJson::TJsonValue JsonResult; + + TEvLoadTestFinished(ui64 tag, TIntrusivePtr<TLoadReport> report, TString errorReason) + : Tag(tag) + , Report(report) + , ErrorReason(errorReason) + {} + }; }; } diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp index 266c195c57..8eeca805a3 100644 --- a/ydb/core/load_test/group_write.cpp +++ b/ydb/core/load_test/group_write.cpp @@ -19,7 +19,7 @@ namespace NKikimr { -class TLogWriterTestLoadActor : public TActorBootstrapped<TLogWriterTestLoadActor> { +class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActor> { class TWakeupQueue { using TCallback = std::function<void(const TActorContext&)>; @@ -771,7 +771,7 @@ public: return NKikimrServices::TActivity::BS_LOAD_ACTOR; } - TLogWriterTestLoadActor(const NKikimr::TEvTestLoadRequest::TLoadStart& cmd, const TActorId& parent, + TLogWriterLoadTestActor(const NKikimr::TEvLoadTestRequest::TLoadStart& cmd, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) : Tag(tag) , Parent(parent) @@ -851,7 +851,7 @@ public: } void Bootstrap(const TActorContext& ctx) { - Become(&TLogWriterTestLoadActor::StateFunc); + Become(&TLogWriterLoadTestActor::StateFunc); if (TestDuration) { ctx.Schedule(*TestDuration, new TEvents::TEvPoisonPill()); } @@ -872,7 +872,7 @@ public: void HandleStopTest(const TActorContext& ctx) { ++TestStoppedRecieved; if (TestStoppedRecieved == TabletWriters.size()) { - ctx.Send(Parent, new TEvTestLoadFinished(Tag, nullptr, "HandleStopTest")); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, "HandleStopTest")); Die(ctx); } } @@ -971,9 +971,9 @@ public: ) }; -IActor *CreateWriterTestLoad(const NKikimr::TEvTestLoadRequest::TLoadStart& cmd, const TActorId& parent, +IActor *CreateWriterLoadTest(const NKikimr::TEvLoadTestRequest::TLoadStart& cmd, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) { - return new TLogWriterTestLoadActor(cmd, parent, std::move(counters), tag); + return new TLogWriterLoadTestActor(cmd, parent, std::move(counters), tag); } } // NKikimr diff --git a/ydb/core/load_test/interval_gen.h b/ydb/core/load_test/interval_gen.h index c2e47de04d..1f8488afe3 100644 --- a/ydb/core/load_test/interval_gen.h +++ b/ydb/core/load_test/interval_gen.h @@ -2,7 +2,6 @@ #include "defs.h" #include "gen.h" -#include <ydb/core/protos/testload.pb.h> #include <util/generic/variant.h> namespace NKikimr { @@ -50,7 +49,7 @@ namespace NKikimr { struct TItem : public std::variant<TUniformItem, TPoissonItem> { using TBase = std::variant<TUniformItem, TPoissonItem>; - TItem(const NKikimr::TEvTestLoadRequest::TIntervalInfo& x) + TItem(const NKikimr::TEvLoadTestRequest::TIntervalInfo& x) : TBase(CreateVariantFromProtobuf(x)) {} diff --git a/ydb/core/load_test/keyvalue_write.cpp b/ydb/core/load_test/keyvalue_write.cpp index d45fe4d8d1..8b27e707e9 100644 --- a/ydb/core/load_test/keyvalue_write.cpp +++ b/ydb/core/load_test/keyvalue_write.cpp @@ -10,7 +10,7 @@ #include <util/generic/queue.h> namespace NKikimr { -class TKeyValueWriterTestLoadActor; +class TKeyValueWriterLoadTestActor; #define PARAM(NAME, VALUE) \ TABLER() { \ @@ -19,7 +19,7 @@ class TKeyValueWriterTestLoadActor; } class TWorker { - friend class TKeyValueWriterTestLoadActor; + friend class TKeyValueWriterLoadTestActor; TString KeyPrefix; TControlWrapper MaxInFlight; @@ -36,7 +36,7 @@ class TWorker { bool IsDying = false; public: - TWorker(const NKikimr::TEvTestLoadRequest::TKeyValueLoadStart::TWorkerConfig& cmd, + TWorker(const NKikimr::TEvLoadTestRequest::TKeyValueLoadStart::TWorkerConfig& cmd, ui32 idx, TReallyFastRng32 *gen) : MaxInFlight(1, 0, 65536) , Idx(idx) @@ -91,7 +91,7 @@ public: } }; -class TKeyValueWriterTestLoadActor : public TActorBootstrapped<TKeyValueWriterTestLoadActor> { +class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLoadTestActor> { struct TRequestInfo { ui32 Size; TInstant LogStartTime; @@ -137,7 +137,7 @@ public: return NKikimrServices::TActivity::BS_LOAD_PDISK_LOG_WRITE; } - TKeyValueWriterTestLoadActor(const NKikimr::TEvTestLoadRequest::TKeyValueLoadStart& cmd, + TKeyValueWriterLoadTestActor(const NKikimr::TEvLoadTestRequest::TKeyValueLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) : Parent(parent) , Tag(tag) @@ -166,12 +166,12 @@ public: ResponseTimes.Initialize(LoadCounters, "subsystem", "LoadActorLogWriteDuration", "Time in microseconds", percentiles); } - ~TKeyValueWriterTestLoadActor() { + ~TKeyValueWriterLoadTestActor() { LoadCounters->ResetCounters(); } void Connect(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " TKeyValueWriterTestLoadActor Connect called"); + LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " TKeyValueWriterLoadTestActor Connect called"); Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); for (auto& worker : Workers) { worker->ItemsInFlight = 0; @@ -181,8 +181,8 @@ public: void Bootstrap(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag - << " TKeyValueWriterTestLoadActor Bootstrap called"); - Become(&TKeyValueWriterTestLoadActor::StateFunc); + << " TKeyValueWriterLoadTestActor Bootstrap called"); + Become(&TKeyValueWriterLoadTestActor::StateFunc); LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill"); ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill); ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring); @@ -215,11 +215,11 @@ public: void StartDeathProcess(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag - << " TKeyValueWriterTestLoadActor StartDeathProcess called"); - Become(&TKeyValueWriterTestLoadActor::StateEndOfWork); - TIntrusivePtr<TLoadReport> Report(new TLoadReport()); + << " TKeyValueWriterLoadTestActor StartDeathProcess called"); + Become(&TKeyValueWriterLoadTestActor::StateEndOfWork); + TIntrusivePtr<TEvLoad::TLoadReport> Report(new TEvLoad::TLoadReport()); Report->Duration = TDuration::Seconds(DurationSeconds); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess")); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK called StartDeathProcess")); NTabletPipe::CloseClient(SelfId(), Pipe); Die(ctx); } @@ -266,7 +266,7 @@ public: TStringStream str; str << " TEvKeyValue::TEvResponse is not OK, msg.ToString()# " << msg->ToString(); LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST, str.Str()); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, nullptr, str.Str())); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, str.Str())); NTabletPipe::CloseClient(SelfId(), Pipe); Die(ctx); return; @@ -317,7 +317,7 @@ public: TEvTabletPipe::TEvClientConnected *msg = ev->Get(); LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag - << " TKeyValueWriterTestLoadActor Handle TEvClientConnected called, Status# " << msg->Status); + << " TKeyValueWriterLoadTestActor Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { if (msg->ClientId == Pipe) { @@ -330,7 +330,7 @@ public: void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag - << " TKeyValueWriterTestLoadActor Handle TEvClientDestroyed called"); + << " TKeyValueWriterLoadTestActor Handle TEvClientDestroyed called"); TEvTabletPipe::TEvClientDestroyed *msg = ev->Get(); if (msg->ClientId == Pipe) { Pipe = TActorId(); @@ -359,9 +359,9 @@ public: ) }; -IActor * CreateKeyValueWriterTestLoad(const NKikimr::TEvTestLoadRequest::TKeyValueLoadStart& cmd, +IActor * CreateKeyValueWriterLoadTest(const NKikimr::TEvLoadTestRequest::TKeyValueLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) { - return new TKeyValueWriterTestLoadActor(cmd, parent, counters, index, tag); + return new TKeyValueWriterLoadTestActor(cmd, parent, counters, index, tag); } } // NKikimr diff --git a/ydb/core/load_test/kqp.cpp b/ydb/core/load_test/kqp.cpp index 66bddc8cc5..d7e5141352 100644 --- a/ydb/core/load_test/kqp.cpp +++ b/ydb/core/load_test/kqp.cpp @@ -298,7 +298,7 @@ public: return NKikimrServices::TActivity::KQP_TEST_WORKLOAD; } - TKqpLoadActor(const NKikimr::TEvTestLoadRequest::TKqpLoadStart& cmd, + TKqpLoadActor(const NKikimr::TEvLoadTestRequest::TKqpLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) : Parent(parent) @@ -326,7 +326,7 @@ public: NYdbWorkload::TWorkloadFactory factory; - if (cmd.Workload_case() == NKikimr::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) { + if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoadStart::WorkloadCase::kStock) { WorkloadClass = NYdbWorkload::EWorkload::STOCK; NYdbWorkload::TStockWorkloadParams params; params.PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad(); @@ -337,7 +337,7 @@ public: params.DbPath = WorkingDir; params.MinPartitions = UniformPartitionsCount; WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - } else if (cmd.Workload_case() == NKikimr::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kKv) { + } else if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoadStart::WorkloadCase::kKv) { WorkloadClass = NYdbWorkload::EWorkload::KV; NYdbWorkload::TKvWorkloadParams params; params.InitRowCount = cmd.GetKv().GetInitRowCount(); @@ -469,10 +469,10 @@ private: void DeathReport(const TActorContext& ctx) { CloseSession(ctx); - TIntrusivePtr<TLoadReport> Report(new TLoadReport()); + TIntrusivePtr<TEvLoad::TLoadReport> Report(new TEvLoad::TLoadReport()); Report->Duration = TDuration::Seconds(DurationSeconds); - auto* finishEv = new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess"); + auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK called StartDeathProcess"); finishEv->LastHtmlPage = RenderHTML(); finishEv->JsonResult = GetJsonResult(); ctx.Send(Parent, finishEv); @@ -749,7 +749,7 @@ private: }; -IActor * CreateKqpLoadActor(const NKikimr::TEvTestLoadRequest::TKqpLoadStart& cmd, +IActor * CreateKqpLoadActor(const NKikimr::TEvLoadTestRequest::TKqpLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) { return new TKqpLoadActor(cmd, parent, counters, index, tag); } diff --git a/ydb/core/load_test/memory.cpp b/ydb/core/load_test/memory.cpp index 88ff01c091..29b55b57af 100644 --- a/ydb/core/load_test/memory.cpp +++ b/ydb/core/load_test/memory.cpp @@ -6,7 +6,7 @@ namespace NKikimr { -class TMemoryTestLoadActor : public TActorBootstrapped<TMemoryTestLoadActor> { +class TMemoryLoadTestActor : public TActorBootstrapped<TMemoryLoadTestActor> { enum { EvAllocateBlock = EventSpaceBegin(TEvents::ES_PRIVATE), EvEnd @@ -30,7 +30,7 @@ public: return NKikimrServices::TActivity::BS_LOAD_PDISK_LOG_WRITE; } - TMemoryTestLoadActor(const NKikimr::TEvTestLoadRequest::TMemoryLoadStart& cmd, + TMemoryLoadTestActor(const NKikimr::TEvLoadTestRequest::TMemoryLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) : Parent(parent) , Tag(tag) @@ -52,9 +52,9 @@ public: void Bootstrap(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag - << " TMemoryTestLoadActor Bootstrap called"); + << " TMemoryLoadTestActor Bootstrap called"); - Become(&TMemoryTestLoadActor::StateFunc); + Become(&TMemoryLoadTestActor::StateFunc); LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill"); @@ -68,9 +68,9 @@ public: LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " Handle PoisonPill"); - TIntrusivePtr<TLoadReport> report(new TLoadReport()); + TIntrusivePtr<TEvLoad::TLoadReport> report(new TEvLoad::TLoadReport()); report->Duration = Duration; - ctx.Send(Parent, new TEvTestLoadFinished(Tag, report, "OK")); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, "OK")); Die(ctx); } @@ -128,14 +128,14 @@ public: ) }; -IActor* CreateMemoryTestLoad( - const NKikimr::TEvTestLoadRequest::TMemoryLoadStart& cmd, +IActor* CreateMemoryLoadTest( + const NKikimr::TEvLoadTestRequest::TMemoryLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) { - return new TMemoryTestLoadActor(cmd, parent, counters, index, tag); + return new TMemoryLoadTestActor(cmd, parent, counters, index, tag); } } // NKikimr diff --git a/ydb/core/load_test/pdisk_log.cpp b/ydb/core/load_test/pdisk_log.cpp index b110dfa584..9a72f19907 100644 --- a/ydb/core/load_test/pdisk_log.cpp +++ b/ydb/core/load_test/pdisk_log.cpp @@ -8,7 +8,7 @@ #include <util/generic/queue.h> namespace NKikimr { -class TPDiskLogWriterTestLoadActor; +class TPDiskLogWriterLoadTestActor; #define VAR_OUT(x) #x "# " << x << "; " @@ -19,7 +19,7 @@ class TPDiskLogWriterTestLoadActor; } class TWorker { - friend class TPDiskLogWriterTestLoadActor; + friend class TPDiskLogWriterLoadTestActor; TVDiskID VDiskId; ui32 Idx; @@ -72,7 +72,7 @@ class TWorker { public: - TWorker(const NKikimr::TEvTestLoadRequest::TPDiskLogLoadStart::TWorkerConfig& cmd, + TWorker(const NKikimr::TEvLoadTestRequest::TPDiskLogLoadStart::TWorkerConfig& cmd, ui32 idx, TReallyFastRng32 *gen) : Idx(idx) , MaxInFlight(1, 0, 65536) @@ -255,7 +255,7 @@ public: } }; -class TPDiskLogWriterTestLoadActor : public TActorBootstrapped<TPDiskLogWriterTestLoadActor> { +class TPDiskLogWriterLoadTestActor : public TActorBootstrapped<TPDiskLogWriterLoadTestActor> { struct TRequestInfo { ui32 Size; TInstant LogStartTime; @@ -305,12 +305,12 @@ public: return NKikimrServices::TActivity::BS_LOAD_PDISK_LOG_WRITE; } - TPDiskLogWriterTestLoadActor(const NKikimr::TEvTestLoadRequest::TPDiskLogLoadStart& cmd, + TPDiskLogWriterLoadTestActor(const NKikimr::TEvLoadTestRequest::TPDiskLogLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) : Parent(parent) , Tag(tag) , Rng(Now().GetValue()) - // , Report(new TLoadReport()) + // , Report(new TEvLoad::TLoadReport()) { VERIFY_PARAM(PDiskId); PDiskId = cmd.GetPDiskId(); @@ -344,12 +344,12 @@ public: LogResponseTimes.Initialize(LoadCounters, "subsystem", "LoadActorLogWriteDuration", "Time in microseconds", percentiles); } - ~TPDiskLogWriterTestLoadActor() { + ~TPDiskLogWriterLoadTestActor() { LoadCounters->ResetCounters(); } void Bootstrap(const TActorContext& ctx) { - Become(&TPDiskLogWriterTestLoadActor::StateFunc); + Become(&TPDiskLogWriterLoadTestActor::StateFunc); LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill"); ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill); ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring); @@ -385,7 +385,7 @@ public: TStringStream str; str << "TEvYardInitResult is not OK, msg.ToString()# " << msg->ToString(); LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST, str.Str()); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, nullptr, str.Str())); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, str.Str())); Die(ctx); return; } @@ -461,7 +461,7 @@ public: } void StartDeathProcess(const TActorContext& ctx) { - Become(&TPDiskLogWriterTestLoadActor::StateEndOfWork); + Become(&TPDiskLogWriterLoadTestActor::StateEndOfWork); if (IsWardenlessTest) { for (auto& worker : Workers) { ++worker->OwnerRound; @@ -505,7 +505,7 @@ public: TStringStream str; str << "TEvYardInitResult is not OK, msg.ToString()# " << msg->ToString(); LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST, str.Str()); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, nullptr, str.Str())); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, str.Str())); Die(ctx); return; } @@ -549,7 +549,7 @@ public: TStringStream str; str << "TEvHarakiriResult is not OK, msg.ToString()# " << msg->ToString(); LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST, str.Str()); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, nullptr, str.Str())); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, str.Str())); Die(ctx); return; } @@ -563,11 +563,11 @@ public: << " GetReallyWrittenBytes()# " << worker->GetReallyWrittenBytes() << " GetGlobalWrittenBytes()# " << worker->GetGlobalWrittenBytes()); } - auto report = std::make_unique<TLoadReport>(); - report->LoadType = TLoadReport::LOAD_LOG_WRITE; + auto report = std::make_unique<TEvLoad::TLoadReport>(); + report->LoadType = TEvLoad::TLoadReport::LOAD_LOG_WRITE; report->Duration = TAppData::TimeProvider->Now() - TestStartTime; - ctx.Send(Parent, new TEvTestLoadFinished(Tag, report.release(), "OK")); - LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " End of work, TEvTestLoadFinished is sent"); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report.release(), "OK")); + LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " End of work, TEvLoadTestFinished is sent"); Die(ctx); } @@ -609,7 +609,7 @@ public: TStringStream str; str << " TEvLogResult is not OK, msg.ToString()# " << msg->ToString(); LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST, str.Str()); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, nullptr, str.Str())); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, str.Str())); Die(ctx); return; } @@ -693,9 +693,9 @@ public: ) }; -IActor *CreatePDiskLogWriterTestLoad(const NKikimr::TEvTestLoadRequest::TPDiskLogLoadStart& cmd, +IActor *CreatePDiskLogWriterLoadTest(const NKikimr::TEvLoadTestRequest::TPDiskLogLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) { - return new TPDiskLogWriterTestLoadActor(cmd, parent, counters, index, tag); + return new TPDiskLogWriterLoadTestActor(cmd, parent, counters, index, tag); } } // NKikimr diff --git a/ydb/core/load_test/pdisk_read.cpp b/ydb/core/load_test/pdisk_read.cpp index 7d2901cb13..55f440d730 100644 --- a/ydb/core/load_test/pdisk_read.cpp +++ b/ydb/core/load_test/pdisk_read.cpp @@ -11,7 +11,7 @@ namespace NKikimr { -class TPDiskReaderTestLoadActor : public TActorBootstrapped<TPDiskReaderTestLoadActor> { +class TPDiskReaderLoadTestActor : public TActorBootstrapped<TPDiskReaderLoadTestActor> { struct TChunkInfo { TChunkIdx Idx; ui32 NumSlots; @@ -112,7 +112,7 @@ class TPDiskReaderTestLoadActor : public TActorBootstrapped<TPDiskReaderTestLoad NMonitoring::TPercentileTrackerLg<6, 5, 15> ResponseTimes; TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters; - TIntrusivePtr<TLoadReport> Report; + TIntrusivePtr<TEvLoad::TLoadReport> Report; TIntrusivePtr<NMonitoring::TCounterForPtr> PDiskBytesRead; TMap<double, TIntrusivePtr<NMonitoring::TCounterForPtr>> DevicePercentiles; @@ -122,14 +122,14 @@ public: return NKikimrServices::TActivity::BS_LOAD_PDISK_READ; } - TPDiskReaderTestLoadActor(const NKikimr::TEvTestLoadRequest::TPDiskReadLoadStart& cmd, const TActorId& parent, + TPDiskReaderLoadTestActor(const NKikimr::TEvLoadTestRequest::TPDiskReadLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) : Parent(parent) , Tag(tag) , MaxInFlight(4, 0, 65536) , OwnerRound(1000 + index) , Rng(Now().GetValue()) - , Report(new TLoadReport()) + , Report(new TEvLoad::TLoadReport()) { ErrorReason = "Still waiting for TEvRegisterPDiskLoadActorResult"; @@ -192,12 +192,12 @@ public: } } - ~TPDiskReaderTestLoadActor() { + ~TPDiskReaderLoadTestActor() { LoadCounters->ResetCounters(); } void Bootstrap(const TActorContext& ctx) { - Become(&TPDiskReaderTestLoadActor::StateFunc); + Become(&TPDiskReaderLoadTestActor::StateFunc); ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill()); ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring); AppData(ctx)->Icb->RegisterLocalControl(MaxInFlight, Sprintf("PDiskReadLoadActor_MaxInFlight_%4" PRIu64, Tag).c_str()); @@ -310,7 +310,7 @@ public: //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void HandlePoisonPill(const TActorContext& ctx) { - Report->LoadType = TLoadReport::LOAD_READ; + Report->LoadType = TEvLoad::TLoadReport::LOAD_READ; MaxInFlight = 0; CheckDie(ctx); } @@ -321,14 +321,14 @@ public: SendRequest(ctx, std::make_unique<NPDisk::TEvHarakiri>(PDiskParams->Owner, PDiskParams->OwnerRound)); Harakiri = true; } else { - ctx.Send(Parent, new TEvTestLoadFinished(Tag, nullptr, ErrorReason)); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, ErrorReason)); Die(ctx); } } } void Handle(NPDisk::TEvHarakiriResult::TPtr& /*ev*/, const TActorContext& ctx) { - ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, ErrorReason)); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, Report, ErrorReason)); Die(ctx); } @@ -585,10 +585,10 @@ public: ) }; -IActor *CreatePDiskReaderTestLoad(const NKikimr::TEvTestLoadRequest::TPDiskReadLoadStart& cmd, +IActor *CreatePDiskReaderLoadTest(const NKikimr::TEvLoadTestRequest::TPDiskReadLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) { - return new TPDiskReaderTestLoadActor(cmd, parent, counters, index, tag); + return new TPDiskReaderLoadTestActor(cmd, parent, counters, index, tag); } } // NKikimr diff --git a/ydb/core/load_test/pdisk_write.cpp b/ydb/core/load_test/pdisk_write.cpp index 8678a3344a..21ef38a153 100644 --- a/ydb/core/load_test/pdisk_write.cpp +++ b/ydb/core/load_test/pdisk_write.cpp @@ -9,7 +9,7 @@ namespace NKikimr { -class TPDiskWriterTestLoadActor : public TActorBootstrapped<TPDiskWriterTestLoadActor> { +class TPDiskWriterLoadTestActor : public TActorBootstrapped<TPDiskWriterLoadTestActor> { struct TChunkInfo { TDeque<std::pair<TChunkIdx, ui32>> WriteQueue; ui32 NumSlots; @@ -96,7 +96,7 @@ class TPDiskWriterTestLoadActor : public TActorBootstrapped<TPDiskWriterTestLoad TBuffer DataBuffer; ui64 Lsn = 1; TChunkInfo *ReservePending = nullptr; - NKikimr::TEvTestLoadRequest::ELogMode LogMode; + NKikimr::TEvLoadTestRequest::ELogMode LogMode; THashMap<TChunkIdx, ui32> ChunkUsageCount; TQueue<TChunkIdx> AllocationQueue; TMultiMap<TInstant, TRequestStat> TimeSeries; @@ -123,7 +123,7 @@ class TPDiskWriterTestLoadActor : public TActorBootstrapped<TPDiskWriterTestLoad NMonitoring::TPercentileTrackerLg<6, 5, 15> ResponseTimes; NMonitoring::TPercentileTrackerLg<6, 5, 15> LogResponseTimes; - TIntrusivePtr<TLoadReport> Report; + TIntrusivePtr<TEvLoad::TLoadReport> Report; TIntrusivePtr<NMonitoring::TCounterForPtr> PDiskBytesWritten; TMap<double, TIntrusivePtr<NMonitoring::TCounterForPtr>> DevicePercentiles; @@ -132,14 +132,14 @@ public: return NKikimrServices::TActivity::BS_LOAD_PDISK_WRITE; } - TPDiskWriterTestLoadActor(const NKikimr::TEvTestLoadRequest::TPDiskLoadStart& cmd, const TActorId& parent, + TPDiskWriterLoadTestActor(const NKikimr::TEvLoadTestRequest::TPDiskLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) : Parent(parent) , Tag(tag) , MaxInFlight(4, 0, 65536) , OwnerRound(1000 + index) , Rng(Now().GetValue()) - , Report(new TLoadReport()) + , Report(new TEvLoad::TLoadReport()) { VERIFY_PARAM(DurationSeconds); @@ -208,12 +208,12 @@ public: } } - ~TPDiskWriterTestLoadActor() { + ~TPDiskWriterLoadTestActor() { LoadCounters->ResetCounters(); } void Bootstrap(const TActorContext& ctx) { - Become(&TPDiskWriterTestLoadActor::StateFunc); + Become(&TPDiskWriterLoadTestActor::StateFunc); ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill); ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring); AppData(ctx)->Icb->RegisterLocalControl(MaxInFlight, Sprintf("PDiskWriteLoadActor_MaxInFlight_%4" PRIu64, Tag).c_str()); @@ -235,7 +235,7 @@ public: TStringStream str; str << "yard init failed, Status# " << NKikimrProto::EReplyStatus_Name(msg->Status); LOG_INFO(ctx, NKikimrServices::BS_LOAD_TEST, "%s", str.Str().c_str()); - ctx.Send(Parent, new TEvTestLoadFinished(Tag, nullptr, str.Str())); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, str.Str())); Die(ctx); return; } @@ -324,7 +324,7 @@ public: //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void HandlePoisonPill(const TActorContext& ctx) { - Report->LoadType = TLoadReport::LOAD_WRITE; + Report->LoadType = TEvLoad::TLoadReport::LOAD_WRITE; MaxInFlight = 0; CheckDie(ctx); } @@ -335,14 +335,14 @@ public: SendRequest(ctx, std::make_unique<NPDisk::TEvHarakiri>(PDiskParams->Owner, PDiskParams->OwnerRound)); Harakiri = true; } else { - ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK, but can't send TEvHarakiri to PDisk")); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK, but can't send TEvHarakiri to PDisk")); Die(ctx); } } } void Handle(NPDisk::TEvHarakiriResult::TPtr& /*ev*/, const TActorContext& ctx) { - ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK")); + ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK")); Die(ctx); } @@ -427,7 +427,7 @@ public: ui32 offset = slotIndex * size; const TInstant now = TAppData::TimeProvider->Now(); // like the parallel mode, but log is treated already written - bool isLogWritten = (LogMode == NKikimr::TEvTestLoadRequest::LOG_NONE); + bool isLogWritten = (LogMode == NKikimr::TEvLoadTestRequest::LOG_NONE); ui64 requestIdx = NewTRequestInfo(size, chunkIdx, now, now, false, isLogWritten); SendRequest(ctx, std::make_unique<NPDisk::TEvChunkWrite>(PDiskParams->Owner, PDiskParams->OwnerRound, chunkIdx, offset, @@ -435,7 +435,7 @@ public: reinterpret_cast<void*>(requestIdx), true, NPriWrite::HullHugeAsyncBlob, Sequential)); ++ChunkWrite_RequestsSent; - if (LogMode == NKikimr::TEvTestLoadRequest::LOG_PARALLEL) { + if (LogMode == NKikimr::TEvLoadTestRequest::LOG_PARALLEL) { SendLogRequest(ctx, requestIdx, chunkIdx); } @@ -461,10 +461,10 @@ public: if (info->LogWritten) { // both data and log are written, this could happen only in LOG_PARALLEL mode; this request is done FinishRequest(ctx, requestIdx); - } else if (LogMode == NKikimr::TEvTestLoadRequest::LOG_SEQUENTIAL) { + } else if (LogMode == NKikimr::TEvLoadTestRequest::LOG_SEQUENTIAL) { // in sequential mode we send log request after completion of data write request SendLogRequest(ctx, requestIdx, msg->ChunkIdx); - } else if (LogMode == NKikimr::TEvTestLoadRequest::LOG_PARALLEL) { + } else if (LogMode == NKikimr::TEvLoadTestRequest::LOG_PARALLEL) { // this is parallel mode and log is not written yet, so request is not complete; we release it to avoid // being deleted } @@ -639,9 +639,9 @@ public: ) }; -IActor *CreatePDiskWriterTestLoad(const NKikimr::TEvTestLoadRequest::TPDiskLoadStart& cmd, +IActor *CreatePDiskWriterLoadTest(const NKikimr::TEvLoadTestRequest::TPDiskLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) { - return new TPDiskWriterTestLoadActor(cmd, parent, counters, index, tag); + return new TPDiskWriterLoadTestActor(cmd, parent, counters, index, tag); } } // NKikimr diff --git a/ydb/core/load_test/service_actor.cpp b/ydb/core/load_test/service_actor.cpp index 2cbd022192..ac8c6ec1d6 100644 --- a/ydb/core/load_test/service_actor.cpp +++ b/ydb/core/load_test/service_actor.cpp @@ -76,7 +76,7 @@ class TLoadActor : public TActorBootstrapped<TLoadActor> { ui64 NextTag = 1; // queue for all-nodes load - TVector<NKikimr::TEvTestLoadRequest> AllNodesLoadConfigs; + TVector<NKikimr::TEvLoadTestRequest> AllNodesLoadConfigs; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; @@ -94,7 +94,7 @@ public: Become(&TLoadActor::StateFunc); } - void Handle(TEvLoad::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvLoad::TEvLoadTestRequest::TPtr& ev, const TActorContext& ctx) { ui32 status = NMsgBusProxy::MSTATUS_OK; TString error; ui64 tag = 0; @@ -107,7 +107,7 @@ public: status = NMsgBusProxy::MSTATUS_ERROR; error = ex.what(); } - auto response = std::make_unique<TEvLoad::TEvTestLoadResponse>(); + auto response = std::make_unique<TEvLoad::TEvLoadTestResponse>(); response->Record.SetStatus(status); if (error) { response->Record.SetErrorReason(error); @@ -128,22 +128,22 @@ public: } } - ui64 ProcessCmd(const NKikimr::TEvTestLoadRequest& record, const TActorContext& ctx) { + ui64 ProcessCmd(const NKikimr::TEvLoadTestRequest& record, const TActorContext& ctx) { ui64 tag = 0; switch (record.Command_case()) { - case NKikimr::TEvTestLoadRequest::CommandCase::kLoadStart: { + case NKikimr::TEvLoadTestRequest::CommandCase::kLoadStart: { const auto& cmd = record.GetLoadStart(); tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateWriterTestLoad(cmd, ctx.SelfID, + LoadActors.emplace(tag, ctx.Register(CreateWriterLoadTest(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag))); break; } - case NKikimr::TEvTestLoadRequest::CommandCase::kLoadStop: { + case NKikimr::TEvLoadTestRequest::CommandCase::kLoadStop: { const auto& cmd = record.GetLoadStop(); if (cmd.HasRemoveAllTags() && cmd.GetRemoveAllTags()) { LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Delete all running load actors"); @@ -165,54 +165,54 @@ public: break; } - case NKikimr::TEvTestLoadRequest::CommandCase::kPDiskLoadStart: { + case NKikimr::TEvLoadTestRequest::CommandCase::kPDiskLoadStart: { const auto& cmd = record.GetPDiskLoadStart(); tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreatePDiskWriterTestLoad( + LoadActors.emplace(tag, ctx.Register(CreatePDiskWriterLoadTest( cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag))); break; } - case NKikimr::TEvTestLoadRequest::CommandCase::kPDiskReadLoadStart: { + case NKikimr::TEvLoadTestRequest::CommandCase::kPDiskReadLoadStart: { const auto& cmd = record.GetPDiskReadLoadStart(); tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreatePDiskReaderTestLoad( + LoadActors.emplace(tag, ctx.Register(CreatePDiskReaderLoadTest( cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag))); break; } - case NKikimr::TEvTestLoadRequest::CommandCase::kPDiskLogLoadStart: { + case NKikimr::TEvLoadTestRequest::CommandCase::kPDiskLogLoadStart: { const auto& cmd = record.GetPDiskLogLoadStart(); tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreatePDiskLogWriterTestLoad( + LoadActors.emplace(tag, ctx.Register(CreatePDiskLogWriterLoadTest( cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag))); break; } - case NKikimr::TEvTestLoadRequest::CommandCase::kVDiskLoadStart: { + case NKikimr::TEvLoadTestRequest::CommandCase::kVDiskLoadStart: { const auto& cmd = record.GetVDiskLoadStart(); tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag); } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateVDiskWriterTestLoad(cmd, ctx.SelfID, tag))); + LoadActors.emplace(tag, ctx.Register(CreateVDiskWriterLoadTest(cmd, ctx.SelfID, tag))); break; } - case NKikimr::TEvTestLoadRequest::CommandCase::kKeyValueLoadStart: { + case NKikimr::TEvLoadTestRequest::CommandCase::kKeyValueLoadStart: { const auto& cmd = record.GetKeyValueLoadStart(); tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { @@ -220,12 +220,12 @@ public: } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateKeyValueWriterTestLoad( + LoadActors.emplace(tag, ctx.Register(CreateKeyValueWriterLoadTest( cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag))); break; } - case NKikimr::TEvTestLoadRequest::CommandCase::kKqpLoadStart: { + case NKikimr::TEvLoadTestRequest::CommandCase::kKqpLoadStart: { const auto& cmd = record.GetKqpLoadStart(); tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { @@ -238,7 +238,7 @@ public: break; } - case NKikimr::TEvTestLoadRequest::CommandCase::kMemoryLoadStart: { + case NKikimr::TEvLoadTestRequest::CommandCase::kMemoryLoadStart: { const auto& cmd = record.GetMemoryLoadStart(); tag = GetOrGenerateTag(cmd); if (LoadActors.count(tag) != 0) { @@ -246,7 +246,7 @@ public: } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new memory load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateMemoryTestLoad( + LoadActors.emplace(tag, ctx.Register(CreateMemoryLoadTest( cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag))); break; } @@ -255,7 +255,7 @@ public: TString protoTxt; google::protobuf::TextFormat::PrintToString(record, &protoTxt); ythrow TLoadActorException() << (TStringBuilder() - << "TLoadActor::Handle(TEvLoad::TEvTestLoadRequest): unexpected command case: " + << "TLoadActor::Handle(TEvLoad::TEvLoadTestRequest): unexpected command case: " << ui32(record.Command_case()) << " protoTxt# " << protoTxt.Quote()); } @@ -263,7 +263,7 @@ public: return tag; } - void Handle(TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvLoad::TEvLoadTestFinished::TPtr& ev, const TActorContext& ctx) { const auto& msg = ev->Get(); auto iter = LoadActors.find(msg->Tag); Y_VERIFY(iter != LoadActors.end()); @@ -328,7 +328,7 @@ public: if (params.Has("protobuf")) { TString errorMsg = "ok"; - NKikimr::TEvTestLoadRequest record; + NKikimr::TEvLoadTestRequest record; bool status = google::protobuf::TextFormat::ParseFromString(params.Get("protobuf"), &record); LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "received protobuf: " << params.Get("protobuf") << " | " @@ -356,7 +356,7 @@ public: return; } else if (params.Has("stop_request")) { LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "received stop request"); - NKikimr::TEvTestLoadRequest record; + NKikimr::TEvLoadTestRequest record; record.MutableLoadStop()->SetRemoveAllTags(true); if (params.Has("all_nodes") && params.Get("all_nodes") == "true") { LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "stop load on all nodes"); @@ -398,7 +398,7 @@ public: for (const auto& cmd : AllNodesLoadConfigs) { for (const auto& id : dyn_node_ids) { LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "sending load request to: " << id); - auto msg = MakeHolder<TEvLoad::TEvTestLoadRequest>(); + auto msg = MakeHolder<TEvLoad::TEvLoadTestRequest>(); msg->Record = cmd; msg->Record.SetCookie(id); ctx.Send(MakeLoadServiceID(id), msg.Release()); @@ -582,15 +582,15 @@ public: } STRICT_STFUNC(StateFunc, - HFunc(TEvLoad::TEvTestLoadRequest, Handle) - HFunc(TEvTestLoadFinished, Handle) + HFunc(TEvLoad::TEvLoadTestRequest, Handle) + HFunc(TEvLoad::TEvLoadTestFinished, Handle) HFunc(NMon::TEvHttpInfo, Handle) HFunc(NMon::TEvHttpInfoRes, Handle) HFunc(TEvInterconnect::TEvNodesInfo, Handle) ) }; -IActor *CreateTestLoadActor(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { +IActor *CreateLoadTestActor(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { return new TLoadActor(counters); } diff --git a/ydb/core/load_test/service_actor.h b/ydb/core/load_test/service_actor.h index 5c91a5ab4a..88e11369f1 100644 --- a/ydb/core/load_test/service_actor.h +++ b/ydb/core/load_test/service_actor.h @@ -3,9 +3,6 @@ #include "defs.h" #include <ydb/core/base/blobstorage.h> -#include <library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h> -#include <library/cpp/json/writer/json_value.h> - #include <cmath> namespace NKikimr { @@ -27,108 +24,41 @@ namespace NKikimr { struct TEvUpdateMonitoring : TEventLocal<TEvUpdateMonitoring, EvUpdateMonitoring> {}; + class TLoadActorException : public yexception + {}; - class TLoadActorException : public yexception { - }; - - NActors::IActor *CreateTestLoadActor(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters); + NActors::IActor *CreateLoadTestActor(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters); - NActors::IActor *CreateWriterTestLoad(const NKikimr::TEvTestLoadRequest::TLoadStart& cmd, + NActors::IActor *CreateWriterLoadTest(const NKikimr::TEvLoadTestRequest::TLoadStart& cmd, const NActors::TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag); - NActors::IActor *CreatePDiskWriterTestLoad(const NKikimr::TEvTestLoadRequest::TPDiskLoadStart& cmd, + NActors::IActor *CreatePDiskWriterLoadTest(const NKikimr::TEvLoadTestRequest::TPDiskLoadStart& cmd, const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag); - NActors::IActor *CreatePDiskLogWriterTestLoad(const NKikimr::TEvTestLoadRequest::TPDiskLogLoadStart& cmd, + NActors::IActor *CreatePDiskLogWriterLoadTest(const NKikimr::TEvLoadTestRequest::TPDiskLogLoadStart& cmd, const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag); - NActors::IActor *CreatePDiskReaderTestLoad(const NKikimr::TEvTestLoadRequest::TPDiskReadLoadStart& cmd, + NActors::IActor *CreatePDiskReaderLoadTest(const NKikimr::TEvLoadTestRequest::TPDiskReadLoadStart& cmd, const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag); - NActors::IActor *CreateVDiskWriterTestLoad(const NKikimr::TEvTestLoadRequest::TVDiskLoadStart& cmd, + NActors::IActor *CreateVDiskWriterLoadTest(const NKikimr::TEvLoadTestRequest::TVDiskLoadStart& cmd, const NActors::TActorId& parent, ui64 tag); - NActors::IActor *CreateKeyValueWriterTestLoad(const NKikimr::TEvTestLoadRequest::TKeyValueLoadStart& cmd, + NActors::IActor *CreateKeyValueWriterLoadTest(const NKikimr::TEvLoadTestRequest::TKeyValueLoadStart& cmd, const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag); - NActors::IActor *CreateKqpLoadActor(const NKikimr::TEvTestLoadRequest::TKqpLoadStart& cmd, + NActors::IActor *CreateKqpLoadActor(const NKikimr::TEvLoadTestRequest::TKqpLoadStart& cmd, const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag); - NActors::IActor *CreateMemoryTestLoad(const NKikimr::TEvTestLoadRequest::TMemoryLoadStart& cmd, + NActors::IActor *CreateMemoryLoadTest(const NKikimr::TEvLoadTestRequest::TMemoryLoadStart& cmd, const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag); - struct TLoadReport : public TThrRefBase { - enum ELoadType { - LOAD_READ, - LOAD_WRITE, - LOAD_LOG_WRITE, - }; - - TDuration Duration; - ui64 Size; - ui32 InFlight; - TVector<ui64> RwSpeedBps; - ELoadType LoadType; - NMonitoring::TPercentileTrackerLg<10, 4, 1> LatencyUs; // Upper threshold of this tracker is ~134 seconds, size is 256kB - TMap<double, ui64> DeviceLatency; - - double GetAverageSpeed() const { - if (RwSpeedBps.size() < 1) { - return 0; - } - double avg = 0; - for (const ui64& speed : RwSpeedBps) { - avg += speed; - } - avg /= RwSpeedBps.size(); - return avg; - } - - double GetSpeedDeviation() const { - if (RwSpeedBps.size() <= 1) { - return 0; - } - i64 avg = (i64)GetAverageSpeed(); - double sd = 0; - for (const ui64& speed : RwSpeedBps) { - sd += ((i64)speed - avg) * ((i64)speed - avg); - } - sd /= RwSpeedBps.size(); - return std::sqrt(sd); - } - - TString LoadTypeName() const { - switch (LoadType) { - case LOAD_READ: - return "read"; - case LOAD_WRITE: - return "write"; - case LOAD_LOG_WRITE: - return "log_write"; - } - } - }; - - struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, TEvBlobStorage::EvTestLoadFinished> { - ui64 Tag; - TIntrusivePtr<TLoadReport> Report; // nullptr indicates error - TString ErrorReason; - TString LastHtmlPage; - NJson::TJsonValue JsonResult; - - TEvTestLoadFinished(ui64 tag, TIntrusivePtr<TLoadReport> report, TString errorReason) - : Tag(tag) - , Report(report) - , ErrorReason(errorReason) - {} - }; - #define VERIFY_PARAM2(FIELD, NAME) \ do { \ if (!(FIELD).Has##NAME()) { \ diff --git a/ydb/core/load_test/size_gen.h b/ydb/core/load_test/size_gen.h index 8c1aa943c8..53136b2926 100644 --- a/ydb/core/load_test/size_gen.h +++ b/ydb/core/load_test/size_gen.h @@ -10,7 +10,7 @@ namespace NKikimr { ui32 Min; ui32 Max; - TItem(const NKikimr::TEvTestLoadRequest::TSizeInfo& x) + TItem(const NKikimr::TEvLoadTestRequest::TSizeInfo& x) : Min(x.GetMin()) , Max(x.GetMax()) { diff --git a/ydb/core/load_test/vdisk_write.cpp b/ydb/core/load_test/vdisk_write.cpp index de06533b00..3152528d88 100644 --- a/ydb/core/load_test/vdisk_write.cpp +++ b/ydb/core/load_test/vdisk_write.cpp @@ -76,7 +76,7 @@ namespace NKikimr { return NKikimrServices::TActivity::BS_LOAD_PDISK_WRITE; } - TVDiskLoadActor(const NKikimr::TEvTestLoadRequest::TVDiskLoadStart& cmd, + TVDiskLoadActor(const NKikimr::TEvLoadTestRequest::TVDiskLoadStart& cmd, const NActors::TActorId& parent, ui64 tag) : ParentActorId(parent) , Tag(tag) @@ -144,7 +144,7 @@ namespace NKikimr { void HandlePoison(const TActorContext& ctx) { ctx.Send(QueueActorId, new TEvents::TEvPoisonPill); - ctx.Send(ParentActorId, new TEvTestLoadFinished(Tag, nullptr, "Poison pill")); + ctx.Send(ParentActorId, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, "Poison pill")); Die(ctx); } @@ -342,7 +342,7 @@ namespace NKikimr { } // <anonymous> - IActor *CreateVDiskWriterTestLoad(const NKikimr::TEvTestLoadRequest::TVDiskLoadStart& cmd, + IActor *CreateVDiskWriterLoadTest(const NKikimr::TEvLoadTestRequest::TVDiskLoadStart& cmd, const NActors::TActorId& parent, ui64 tag) { return new TVDiskLoadActor(cmd, parent, tag); } diff --git a/ydb/core/protos/CMakeLists.txt b/ydb/core/protos/CMakeLists.txt index 0e11f0f11b..74817f4935 100644 --- a/ydb/core/protos/CMakeLists.txt +++ b/ydb/core/protos/CMakeLists.txt @@ -90,6 +90,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/kqp_stats.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/kqp.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/labeled_counters.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/load_test.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/local.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/long_tx_service.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/metrics.proto @@ -129,7 +130,6 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/tenant_pool.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/tenant_slot_broker.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/test_shard.proto - ${CMAKE_SOURCE_DIR}/ydb/core/protos/testload.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/tracing.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/node_whiteboard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/tx.proto diff --git a/ydb/core/protos/testload.proto b/ydb/core/protos/load_test.proto index 3002926330..c775fa5848 100644 --- a/ydb/core/protos/testload.proto +++ b/ydb/core/protos/load_test.proto @@ -4,7 +4,7 @@ import "ydb/core/protos/blobstorage_disk.proto"; package NKikimr; option java_package = "ru.yandex.kikimr.proto"; -message TEvTestLoadRequest { +message TEvLoadTestRequest { // an item for interval distribution setting message TIntervalInfo { message TIntervalUniform { @@ -242,7 +242,7 @@ message TEvTestLoadRequest { } } -message TEvTestLoadResponse { +message TEvLoadTestResponse { optional uint32 Status = 1; // EResponseStatus from ydb/core/client/base/msgbus.h optional string ErrorReason = 2; optional uint64 Cookie = 3; diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto index cc633a7a03..81e5d01d7d 100644 --- a/ydb/core/protos/msgbus.proto +++ b/ydb/core/protos/msgbus.proto @@ -22,7 +22,7 @@ import "ydb/core/protos/sqs.proto"; import "ydb/core/protos/issue_id.proto"; import "ydb/core/protos/query_stats.proto"; import "ydb/core/protos/subdomains.proto"; -import "ydb/core/protos/testload.proto"; +import "ydb/core/protos/load_test.proto"; import "ydb/public/api/protos/ydb_operation.proto"; import "ydb/public/api/protos/draft/persqueue_error_codes.proto"; import "ydb/public/api/protos/ydb_issue_message.proto"; @@ -481,7 +481,7 @@ message TDsTestLoadResponse { message TBsTestLoadRequest { repeated uint32 NodeId = 1; - optional NKikimr.TEvTestLoadRequest Event = 2; + optional NKikimr.TEvLoadTestRequest Event = 2; }; message TBsTestLoadResponse { |