diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-01-09 22:20:31 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-01-09 22:20:31 +0300 |
commit | 686f4f5fb194fae77e30db2dadf9f9166c65d1de (patch) | |
tree | c8e01081be7fe0ff8b0c7d631b5bf932c5feaa92 | |
parent | 71dae704b988f15eee2751c291c415263c9b35e3 (diff) | |
download | ydb-686f4f5fb194fae77e30db2dadf9f9166c65d1de.tar.gz |
cleanup tag ussage in subactors (use special subId)
-rw-r--r-- | ydb/core/load_test/ycsb/actors.h | 10 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp | 53 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/kqp_upsert.cpp | 71 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/test_load_actor.cpp | 23 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/test_load_actor.h | 12 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/test_load_read_iterator.cpp | 110 |
6 files changed, 158 insertions, 121 deletions
diff --git a/ydb/core/load_test/ycsb/actors.h b/ydb/core/load_test/ycsb/actors.h index 542bb27eab..759bc60f05 100644 --- a/ydb/core/load_test/ycsb/actors.h +++ b/ydb/core/load_test/ycsb/actors.h @@ -12,35 +12,35 @@ IActor *CreateUpsertBulkActor( const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag); + const TSubLoadId& id); IActor *CreateLocalMkqlUpsertActor( const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd, const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag); + const TSubLoadId& id); IActor *CreateKqpUpsertActor( const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd, const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag); + const TSubLoadId& id); IActor *CreateProposeUpsertActor( const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd, const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag); + const TSubLoadId& id); IActor *CreateReadIteratorActor( const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd, const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag); + const TSubLoadId& id); class TLoadManagerException : public yexception { }; diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp index 17a9e74ceb..fd5e4613f6 100644 --- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp +++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp @@ -128,7 +128,7 @@ class TUpsertActor : public TActorBootstrapped<TUpsertActor> { const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart Config; const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target; const TActorId Parent; - const ui64 Tag; + const TSubLoadId Id; const ERequestType RequestType; TString ConfingString; @@ -152,12 +152,12 @@ public: const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag, + const TSubLoadId& id, ERequestType requestType) : Config(cmd) , Target(target) , Parent(parent) - , Tag(tag) + , Id(id) , RequestType(requestType) { Y_UNUSED(counters); @@ -165,7 +165,7 @@ public: } void Bootstrap(const TActorContext& ctx) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Bootstrap called: " << ConfingString); // note that we generate all requests at once to send at max speed, i.e. @@ -184,14 +184,14 @@ public: private: void Connect(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag << " TUpsertActor Connect called"); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Connect called"); Pipe = Register(NTabletPipe::CreateClient(SelfId(), Target.GetTabletId())); } void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { TEvTabletPipe::TEvClientConnected *msg = ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { @@ -206,7 +206,7 @@ private: } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Handle TEvClientDestroyed called"); StopWithError(ctx, "broken pipe"); } @@ -214,8 +214,8 @@ private: void SendRows(const TActorContext &ctx) { while (Inflight < Config.GetInflight() && CurrentRequest < Requests.size()) { const auto* request = Requests[CurrentRequest].get(); - LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag - << "TUpsertActor# " << Tag << " send request# " << CurrentRequest << ": " << request->ToString()); + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id + << "TUpsertActor# " << Id << " send request# " << CurrentRequest << ": " << request->ToString()); NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release()); ++CurrentRequest; ++Inflight; @@ -229,30 +229,30 @@ private: EndTs = TInstant::Now(); auto delta = EndTs - StartTs; - auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag); auto& report = *response->Record.MutableReport(); - report.SetTag(Tag); + report.SetTag(Id.SubTag); report.SetDurationMs(delta.MilliSeconds()); report.SetOperationsOK(Requests.size() - Errors); report.SetOperationsError(Errors); ctx.Send(Parent, response.release()); - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor finished in " << delta << ", errors=" << Errors); Die(ctx); } } void Handle(TEvDataShard::TEvUploadRowsResponse::TPtr ev, const TActorContext& ctx) { - LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor received from " << ev->Sender << ": " << ev->Get()->Record); --Inflight; TEvDataShard::TEvUploadRowsResponse *msg = ev->Get(); if (msg->Record.GetStatus() != 0) { ++Errors; - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor TEvUploadRowsResponse: " << msg->ToString()); } @@ -260,14 +260,14 @@ private: } void Handle(TEvTablet::TEvLocalMKQLResponse::TPtr ev, const TActorContext& ctx) { - LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor received from " << ev->Sender << ": " << ev->Get()->Record); --Inflight; TEvTablet::TEvLocalMKQLResponse *msg = ev->Get(); if (msg->Record.GetStatus() != 0) { ++Errors; - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor TEvLocalMKQLResponse: " << msg->ToString()); } @@ -281,7 +281,7 @@ private: void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) { TStringStream str; HTML(str) { - str << "DS bulk upsert load actor# " << Tag << " started on " << StartTs + str << "DS bulk upsert load actor# " << Id << " started on " << StartTs << " sent " << CurrentRequest << " out of " << Requests.size(); TInstant ts = EndTs ? EndTs : TInstant::Now(); auto delta = ts - StartTs; @@ -290,7 +290,7 @@ private: << " errors=" << Errors; } - ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Tag, str.Str())); + ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str())); } void HandlePoison(const TActorContext& ctx) { @@ -301,7 +301,7 @@ private: void StopWithError(const TActorContext& ctx, const TString& reason) { LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason); - ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason)); NTabletPipe::CloseClient(SelfId(), Pipe); Die(ctx); } @@ -321,27 +321,30 @@ private: IActor *CreateUpsertBulkActor(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd, const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, - const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) + const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TSubLoadId& id) { - return new TUpsertActor(cmd, target, parent, std::move(counters), tag, ERequestType::UpsertBulk); + return new TUpsertActor(cmd, target, parent, std::move(counters), id, ERequestType::UpsertBulk); } IActor *CreateLocalMkqlUpsertActor(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd, const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, - const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) + const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TSubLoadId& id) { - return new TUpsertActor(cmd, target, parent, std::move(counters), tag, ERequestType::UpsertLocalMkql); + return new TUpsertActor(cmd, target, parent, std::move(counters), id, ERequestType::UpsertLocalMkql); } IActor *CreateProposeUpsertActor(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd, const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, - const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag) + const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TSubLoadId& id) { Y_UNUSED(cmd); Y_UNUSED(target); Y_UNUSED(parent); Y_UNUSED(counters); - Y_UNUSED(tag); + Y_UNUSED(id); return nullptr; // not yet implemented } diff --git a/ydb/core/load_test/ycsb/kqp_upsert.cpp b/ydb/core/load_test/ycsb/kqp_upsert.cpp index ecfd3c28a0..54f2217c9a 100644 --- a/ydb/core/load_test/ycsb/kqp_upsert.cpp +++ b/ydb/core/load_test/ycsb/kqp_upsert.cpp @@ -72,14 +72,13 @@ TQueryInfo GenerateUpsert(size_t n, const TString& table) { return TQueryInfo(str.Str(), std::move(params)); } - // it's a partial copy-paste from TUpsertActor: logic slightly differs, so that // it seems better to have copy-paste rather if/else for different loads class TKqpUpsertActor : public TActorBootstrapped<TKqpUpsertActor> { const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart Config; const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target; const TActorId Parent; - const ui64 Tag; + const TSubLoadId Id; const TString Database; TString ConfingString; @@ -99,12 +98,12 @@ public: const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag, + const TSubLoadId& id, TRequestsVector requests) : Config(cmd) , Target(target) , Parent(parent) - , Tag(tag) + , Id(id) , Database(Target.GetWorkingDir()) , Requests(std::move(requests)) { @@ -113,7 +112,7 @@ public: } void Bootstrap(const TActorContext& ctx) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " Bootstrap called: " << ConfingString); Become(&TKqpUpsertActor::StateFunc); @@ -123,7 +122,7 @@ public: private: void CreateSession(const TActorContext& ctx) { auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " sends event for session creation to proxy: " << kqpProxy.ToString()); auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); @@ -136,7 +135,7 @@ private: return; auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " sends session close query to proxy: " << kqpProxy); auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); @@ -150,7 +149,7 @@ private: request->Record.MutableRequest()->SetSessionId(Session); auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " send request# " << CurrentRequest << " to proxy# " << kqpProxy << ": " << request->ToString()); @@ -168,23 +167,24 @@ private: EndTs = TInstant::Now(); auto delta = EndTs - StartTs; - auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag); auto& report = *response->Record.MutableReport(); - report.SetTag(Tag); + report.SetTag(Id.SubTag); report.SetDurationMs(delta.MilliSeconds()); report.SetOperationsOK(Requests.size() - Errors); report.SetOperationsError(Errors); ctx.Send(Parent, response.release()); - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " finished in " << delta << ", errors=" << Errors); Die(ctx); } } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag << " tablet recieved PoisonPill, going to die"); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id + << " tablet recieved PoisonPill, going to die"); CloseSession(ctx); Die(ctx); } @@ -194,7 +194,7 @@ private: if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { Session = response.GetResponse().GetSessionId(); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag << " session: " << Session); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " session: " << Session); SendRows(ctx); } else { StopWithError(ctx, "failed to create session: " + ev->Get()->ToString()); @@ -202,7 +202,7 @@ private: } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " received from " << ev->Sender << ": " << ev->Get()->Record.DebugString()); --Inflight; @@ -217,14 +217,14 @@ private: void StopWithError(const TActorContext& ctx, const TString& reason) { LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason); - ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason)); Die(ctx); } void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) { TStringStream str; HTML(str) { - str << "TKqpUpsertActor# " << Tag << " started on " << StartTs + str << "TKqpUpsertActor# " << Id << " started on " << StartTs << " sent " << CurrentRequest << " out of " << Requests.size(); TInstant ts = EndTs ? EndTs : TInstant::Now(); auto delta = ts - StartTs; @@ -233,7 +233,7 @@ private: << " errors=" << Errors; } - ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Tag, str.Str())); + ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str())); } STRICT_STFUNC(StateFunc, @@ -249,12 +249,13 @@ class TKqpUpsertActorMultiSession : public TActorBootstrapped<TKqpUpsertActorMul const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart Config; const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target; const TActorId Parent; - const ui64 Tag; + const TSubLoadId Id; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; const TString Database; TString ConfingString; + ui64 LastSubTag = 0; TVector<TActorId> Actors; size_t Inflight = 0; @@ -270,11 +271,11 @@ public: const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag) + const TSubLoadId& id) : Config(cmd) , Target(target) , Parent(parent) - , Tag(tag) + , Id(id) , Counters(counters) , Database(target.GetWorkingDir()) { @@ -283,7 +284,7 @@ public: } void Bootstrap(const TActorContext& ctx) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id << " Bootstrap called: " << ConfingString); Become(&TKqpUpsertActorMultiSession::StateFunc); @@ -331,7 +332,7 @@ private: Actors.reserve(actorsCount); Inflight = actorsCount; for (size_t i = 0; i < actorsCount; ++i) { - ui32 pseudoTag = 1000000 + i; + TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag); auto configCopy = Config; configCopy.SetInflight(1); // we have only 1 session configCopy.SetRowCount(requestsPerActor); @@ -341,12 +342,12 @@ private: Target, SelfId(), Counters, - pseudoTag, + subId, std::move(perActorRequests[i])); Actors.emplace_back(ctx.Register(kqpActor)); } - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id << " started# " << actorsCount << " actors each with inflight# " << requestsPerActor); } @@ -362,7 +363,7 @@ private: return; } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Tag << " finished: " << ev->Get()->ToString()); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Id << " finished: " << ev->Get()->ToString()); Errors += record.GetReport().GetOperationsError(); Oks += record.GetReport().GetOperationsOK(); @@ -372,15 +373,15 @@ private: EndTs = TInstant::Now(); auto delta = EndTs - StartTs; - auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag); auto& report = *response->Record.MutableReport(); - report.SetTag(Tag); + report.SetTag(Id.SubTag); report.SetDurationMs(delta.MilliSeconds()); report.SetOperationsOK(Oks); report.SetOperationsError(Errors); ctx.Send(Parent, response.release()); - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id << " finished in " << delta << ", oks# " << Oks << ", errors# " << Errors); Stop(ctx); @@ -390,22 +391,22 @@ private: void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) { TStringStream str; HTML(str) { - str << "TKqpUpsertActorMultiSession# " << Tag << " started on " << StartTs; + str << "TKqpUpsertActorMultiSession# " << Id << " started on " << StartTs; } - ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Tag, str.Str())); + ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str())); } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id << " tablet recieved PoisonPill, going to die"); Stop(ctx); } void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id << " stopped with error: " << reason); - ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason)); Stop(ctx); } @@ -431,9 +432,9 @@ IActor *CreateKqpUpsertActor( const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag) + const TSubLoadId& id) { - return new TKqpUpsertActorMultiSession(cmd, target, parent, std::move(counters), tag); + return new TKqpUpsertActorMultiSession(cmd, target, parent, std::move(counters), id); } } // NKikimr::NDataShardLoad diff --git a/ydb/core/load_test/ycsb/test_load_actor.cpp b/ydb/core/load_test/ycsb/test_load_actor.cpp index 1bc0d52491..4ae6a2994b 100644 --- a/ydb/core/load_test/ycsb/test_load_actor.cpp +++ b/ydb/core/load_test/ycsb/test_load_actor.cpp @@ -49,7 +49,7 @@ private: TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; EState State = EState::Init; - ui64 LastTag; + ui64 LastTag = 0; TInstant StartTs; THashSet<TActorId> LoadActors; @@ -74,7 +74,6 @@ public: , Tag(tag) , Request(std::move(request)) , Counters(counters) - , LastTag(tag) {} void Bootstrap(const TActorContext& ctx) { @@ -346,7 +345,7 @@ public: target, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), - ++LastTag))); + TSubLoadId(Tag, ctx.SelfID, ++LastTag)))); } void RunLoad(const TActorContext& ctx) { @@ -362,7 +361,7 @@ public: Request.GetTargetShard(), ctx.SelfID, counters, - tag)); + TSubLoadId(Tag, ctx.SelfID, ++LastTag))); break; case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: actor.reset(CreateLocalMkqlUpsertActor( @@ -370,7 +369,7 @@ public: Request.GetTargetShard(), ctx.SelfID, counters, - tag)); + TSubLoadId(Tag, ctx.SelfID, ++LastTag))); break; case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kUpsertKqpStart: actor.reset(CreateKqpUpsertActor( @@ -378,7 +377,7 @@ public: Request.GetTargetShard(), ctx.SelfID, counters, - tag)); + TSubLoadId(Tag, ctx.SelfID, ++LastTag))); break; case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kUpsertProposeStart: actor.reset(CreateProposeUpsertActor( @@ -386,7 +385,7 @@ public: Request.GetTargetShard(), ctx.SelfID, counters, - tag)); + TSubLoadId(Tag, ctx.SelfID, ++LastTag))); break; case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kReadIteratorStart: actor.reset(CreateReadIteratorActor( @@ -394,7 +393,7 @@ public: Request.GetTargetShard(), ctx.SelfID, counters, - tag)); + TSubLoadId(Tag, ctx.SelfID, ++LastTag))); break; default: { TStringStream ss; @@ -613,3 +612,11 @@ inline void Out<NKikimr::NDataShardLoad::TLoad::EState>( break; } } + +template <> +void Out<NKikimr::NDataShardLoad::TSubLoadId>( + IOutputStream& o, + const NKikimr::NDataShardLoad::TSubLoadId& loadId) +{ + o << "{Tag: " << loadId.Tag << ", parent: " << loadId.Parent << ", subTag: " << loadId.SubTag << "}"; +} diff --git a/ydb/core/load_test/ycsb/test_load_actor.h b/ydb/core/load_test/ycsb/test_load_actor.h index ff8e95e59f..556f3c5b92 100644 --- a/ydb/core/load_test/ycsb/test_load_actor.h +++ b/ydb/core/load_test/ycsb/test_load_actor.h @@ -95,5 +95,17 @@ IActor *CreateTestLoadActor( const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 tag); +struct TSubLoadId { + ui64 Tag = 0; // tag assigned to the "main" load actor (i.e. controlling the load) by service actor + TActorId Parent; // parent of the subload + ui64 SubTag = 0; + + TSubLoadId(ui64 tag, const TActorId& parent, ui64 subTag = 0) + : Tag(tag) + , Parent(parent) + , SubTag(subTag) + {} +}; + } // NDataShardLoad } // NKikimr diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp index 05ad9a63a5..0cc161deb3 100644 --- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp +++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp @@ -93,6 +93,7 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { const NKikimrTxDataShard::EScanDataFormat Format; const ui64 TabletId; const TActorId Parent; + const TSubLoadId Id; TActorId Pipe; @@ -109,12 +110,14 @@ public: TReadIteratorPoints(TEvDataShard::TEvRead* request, ui64 tablet, const TActorId& parent, + const TSubLoadId& id, const TVector<TOwnedCellVec>& points, ui64 readCount) : BaseRequest(request) , Format(BaseRequest->Record.GetResultFormat()) , TabletId(tablet) , Parent(parent) + , Id(id) , Points(points) , ReadCount(readCount) { @@ -122,8 +125,8 @@ public: } void Bootstrap(const TActorContext& ctx) { - LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() - << " with parent# " << Parent << " Bootstrap called, will read keys# " << Points.size()); + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " Bootstrap called, will read keys# " << Points.size()); Become(&TReadIteratorPoints::StateFunc); @@ -138,16 +141,16 @@ public: private: void Connect(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() - << " with parent# " << Parent << " Connect to# " << TabletId << " called"); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " Connect to# " << TabletId << " called"); Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); } void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { TEvTabletPipe::TEvClientConnected *msg = ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() - << " with parent# " << Parent << " Handle TEvClientConnected called, Status# " << msg->Status); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { TStringStream ss; @@ -160,8 +163,8 @@ private: } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() - << " with parent# " << Parent << " Handle TEvClientDestroyed called"); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " Handle TEvClientDestroyed called"); return StopWithError(ctx, "broken pipe"); } @@ -217,8 +220,8 @@ private: } void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() - << " with parent# " << Parent << ", stopped with error: " << reason); + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << ", stopped with error: " << reason); ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason)); NTabletPipe::CloseClient(SelfId(), Pipe); @@ -226,8 +229,8 @@ private: } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId() - << " with parent# " << Parent << " tablet recieved PoisonPill, going to die"); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " tablet recieved PoisonPill, going to die"); // TODO: cancel iterator return Die(ctx); @@ -249,6 +252,7 @@ class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> { const NKikimrTxDataShard::EScanDataFormat Format; const ui64 TabletId; const TActorId Parent; + const TSubLoadId Id; const ui64 SampleKeyCount; TActorId Pipe; @@ -259,18 +263,23 @@ class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> { TVector<TOwnedCellVec> SampledKeys; public: - TReadIteratorScan(TEvDataShard::TEvRead* request, ui64 tablet, const TActorId& parent, ui64 sample) + TReadIteratorScan(TEvDataShard::TEvRead* request, + ui64 tablet, + const TActorId& parent, + const TSubLoadId& id, + ui64 sample) : Request(request) , Format(Request->Record.GetResultFormat()) , TabletId(tablet) , Parent(parent) + , Id(id) , SampleKeyCount(sample) { } void Bootstrap(const TActorContext& ctx) { - LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() - << " with parent# " << Parent << " Bootstrap called, sample# " << SampleKeyCount); + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " Bootstrap called, sample# " << SampleKeyCount); Become(&TReadIteratorScan::StateFunc); Connect(ctx); @@ -278,16 +287,16 @@ public: private: void Connect(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() - << " with parent# " << Parent << " Connect to# " << TabletId << " called"); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " Connect to# " << TabletId << " called"); Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); } void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { TEvTabletPipe::TEvClientConnected *msg = ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() - << " with parent# " << Parent << " Handle TEvClientConnected called, Status# " << msg->Status); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { TStringStream ss; @@ -300,8 +309,8 @@ private: } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() - << " with parent# " << Parent << " Handle TEvClientDestroyed called"); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " Handle TEvClientDestroyed called"); return StopWithError(ctx, "broken pipe"); } @@ -340,8 +349,8 @@ private: } if (record.GetFinished() || SampledKeys.size() >= SampleKeyCount) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() - << " with parent# " << Parent << " finished in " << delta + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " finished in " << delta << ", sampled# " << SampledKeys.size() << ", iter finished# " << record.GetFinished() << ", oks# " << Oks); @@ -352,8 +361,8 @@ private: return; } else if (record.GetFinished()) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() - << " with parent# " << Parent << " finished in " << delta + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " finished in " << delta << ", read# " << Oks); auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0); @@ -368,8 +377,8 @@ private: } void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() - << " with parent# " << Parent << ", stopped with error: " << reason); + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << ", stopped with error: " << reason); ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason)); NTabletPipe::CloseClient(SelfId(), Pipe); @@ -377,8 +386,8 @@ private: } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId() - << " with parent# " << Parent << " tablet recieved PoisonPill, going to die"); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id + << " tablet recieved PoisonPill, going to die"); // TODO: cancel iterator return Die(ctx); @@ -407,7 +416,7 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target; const TActorId Parent; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; - const ui64 Tag; + const TSubLoadId Id; // used to measure full run of this actor TInstant StartTs; @@ -437,6 +446,8 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce EState State = EState::DescribePath; ui64 Inflight = 0; + ui64 LastSubTag = 0; + // setup for fullscan TVector<ui64> ChunkSizes = {0, 0, 1, 1, 10, 10, 100, 100, 1000, 1000}; // each twice intentionally size_t ChunkIndex = 0; @@ -452,12 +463,12 @@ public: const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag) + const TSubLoadId& id) : Config(cmd) , Target(target) , Parent(parent) , Counters(std::move(counters)) - , Tag(tag) + , Id(id) , HeadReadsHist(1000, 4) { google::protobuf::TextFormat::PrintToString(cmd, &ConfingString); @@ -485,7 +496,7 @@ public: void Bootstrap(const TActorContext& ctx) { LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << SelfId() - << " with tag# " << Tag << " Bootstrap called: " << ConfingString); + << " with id# " << Id << " Bootstrap called: " << ConfingString); Become(&TReadIteratorLoadScenario::StateFunc); StartTs = TInstant::Now(); @@ -543,7 +554,7 @@ private: AllColumnIds.push_back(column.GetId()); } - LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id << " will work with tablet# " << TabletId << " with ownerId# " << OwnerId << " with tableId# " << TableId << " resolved for path# " << Target.GetWorkingDir() << "/" << Target.GetTableName() @@ -580,7 +591,8 @@ private: record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC); - auto* actor = new TReadIteratorScan(request.release(), TabletId, SelfId(), sampleKeys); + TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag); + auto* actor = new TReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys); StartedActors.emplace_back(ctx.Register(actor)); LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back()); @@ -651,7 +663,7 @@ private: void Handle(TEvPrivate::TEvKeys::TPtr& ev, const TActorContext& ctx) { Keys = std::move(ev->Get()->Keys); - LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id << " received keyCount# " << Keys.size()); State = EState::ReadHeadPoints; @@ -684,16 +696,18 @@ private: record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC); + TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag); auto* readActor = new TReadIteratorPoints( request.release(), TabletId, SelfId(), + subId, Keys, ReadCount); StartedActors.emplace_back(ctx.Register(readActor)); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id << " started read actor with id# " << StartedActors.back()); } @@ -701,7 +715,7 @@ private: --Inflight; const auto& requestTimes = ev->Get()->RequestTimes; - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id << " received point times# " << requestTimes.size() << ", Inflight left# " << Inflight); for (auto t: requestTimes) { @@ -743,9 +757,9 @@ private: auto ts = TInstant::Now(); auto delta = ts - StartTs; - auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag); auto& report = *response->Record.MutableReport(); - report.SetTag(Tag); + report.SetTag(Id.SubTag); report.SetDurationMs(delta.MilliSeconds()); report.SetOperationsOK(Oks); report.SetOperationsError(0); @@ -758,7 +772,7 @@ private: report.SetInfo(ss.Str()); report.SetSubtestCount(Results.size()); - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id << " finished in " << delta << " with report:\n" << report.GetInfo()); ctx.Send(Parent, response.release()); @@ -769,22 +783,22 @@ private: void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) { TStringStream str; HTML(str) { - str << "ReadIteratorLoadScenario# " << Tag << " started on " << StartTs; + str << "ReadIteratorLoadScenario# " << Id << " started on " << StartTs; } - ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Tag, str.Str())); + ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str())); } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id << " tablet recieved PoisonPill, going to die"); Stop(ctx); } void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id << " stopped with error: " << reason); - ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); + ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason)); Stop(ctx); } @@ -811,9 +825,9 @@ IActor *CreateReadIteratorActor( const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd, const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target, const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - ui64 tag) + const TSubLoadId& id) { - return new TReadIteratorLoadScenario(cmd, target, parent, std::move(counters), tag); + return new TReadIteratorLoadScenario(cmd, target, parent, std::move(counters), id); } } // NKikimr::NDataShardLoad |