aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-01-09 22:20:31 +0300
committereivanov89 <eivanov89@ydb.tech>2023-01-09 22:20:31 +0300
commit686f4f5fb194fae77e30db2dadf9f9166c65d1de (patch)
treec8e01081be7fe0ff8b0c7d631b5bf932c5feaa92
parent71dae704b988f15eee2751c291c415263c9b35e3 (diff)
downloadydb-686f4f5fb194fae77e30db2dadf9f9166c65d1de.tar.gz
cleanup tag ussage in subactors (use special subId)
-rw-r--r--ydb/core/load_test/ycsb/actors.h10
-rw-r--r--ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp53
-rw-r--r--ydb/core/load_test/ycsb/kqp_upsert.cpp71
-rw-r--r--ydb/core/load_test/ycsb/test_load_actor.cpp23
-rw-r--r--ydb/core/load_test/ycsb/test_load_actor.h12
-rw-r--r--ydb/core/load_test/ycsb/test_load_read_iterator.cpp110
6 files changed, 158 insertions, 121 deletions
diff --git a/ydb/core/load_test/ycsb/actors.h b/ydb/core/load_test/ycsb/actors.h
index 542bb27eab..759bc60f05 100644
--- a/ydb/core/load_test/ycsb/actors.h
+++ b/ydb/core/load_test/ycsb/actors.h
@@ -12,35 +12,35 @@ IActor *CreateUpsertBulkActor(
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag);
+ const TSubLoadId& id);
IActor *CreateLocalMkqlUpsertActor(
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd,
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag);
+ const TSubLoadId& id);
IActor *CreateKqpUpsertActor(
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd,
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag);
+ const TSubLoadId& id);
IActor *CreateProposeUpsertActor(
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd,
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag);
+ const TSubLoadId& id);
IActor *CreateReadIteratorActor(
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd,
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag);
+ const TSubLoadId& id);
class TLoadManagerException : public yexception {
};
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
index 17a9e74ceb..fd5e4613f6 100644
--- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
@@ -128,7 +128,7 @@ class TUpsertActor : public TActorBootstrapped<TUpsertActor> {
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart Config;
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target;
const TActorId Parent;
- const ui64 Tag;
+ const TSubLoadId Id;
const ERequestType RequestType;
TString ConfingString;
@@ -152,12 +152,12 @@ public:
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag,
+ const TSubLoadId& id,
ERequestType requestType)
: Config(cmd)
, Target(target)
, Parent(parent)
- , Tag(tag)
+ , Id(id)
, RequestType(requestType)
{
Y_UNUSED(counters);
@@ -165,7 +165,7 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor Bootstrap called: " << ConfingString);
// note that we generate all requests at once to send at max speed, i.e.
@@ -184,14 +184,14 @@ public:
private:
void Connect(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag << " TUpsertActor Connect called");
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Connect called");
Pipe = Register(NTabletPipe::CreateClient(SelfId(), Target.GetTabletId()));
}
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
TEvTabletPipe::TEvClientConnected *msg = ev->Get();
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor Handle TEvClientConnected called, Status# " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
@@ -206,7 +206,7 @@ private:
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor Handle TEvClientDestroyed called");
StopWithError(ctx, "broken pipe");
}
@@ -214,8 +214,8 @@ private:
void SendRows(const TActorContext &ctx) {
while (Inflight < Config.GetInflight() && CurrentRequest < Requests.size()) {
const auto* request = Requests[CurrentRequest].get();
- LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
- << "TUpsertActor# " << Tag << " send request# " << CurrentRequest << ": " << request->ToString());
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
+ << "TUpsertActor# " << Id << " send request# " << CurrentRequest << ": " << request->ToString());
NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release());
++CurrentRequest;
++Inflight;
@@ -229,30 +229,30 @@ private:
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
- auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag);
auto& report = *response->Record.MutableReport();
- report.SetTag(Tag);
+ report.SetTag(Id.SubTag);
report.SetDurationMs(delta.MilliSeconds());
report.SetOperationsOK(Requests.size() - Errors);
report.SetOperationsError(Errors);
ctx.Send(Parent, response.release());
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor finished in " << delta << ", errors=" << Errors);
Die(ctx);
}
}
void Handle(TEvDataShard::TEvUploadRowsResponse::TPtr ev, const TActorContext& ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor received from " << ev->Sender << ": " << ev->Get()->Record);
--Inflight;
TEvDataShard::TEvUploadRowsResponse *msg = ev->Get();
if (msg->Record.GetStatus() != 0) {
++Errors;
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor TEvUploadRowsResponse: " << msg->ToString());
}
@@ -260,14 +260,14 @@ private:
}
void Handle(TEvTablet::TEvLocalMKQLResponse::TPtr ev, const TActorContext& ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor received from " << ev->Sender << ": " << ev->Get()->Record);
--Inflight;
TEvTablet::TEvLocalMKQLResponse *msg = ev->Get();
if (msg->Record.GetStatus() != 0) {
++Errors;
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor TEvLocalMKQLResponse: " << msg->ToString());
}
@@ -281,7 +281,7 @@ private:
void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) {
TStringStream str;
HTML(str) {
- str << "DS bulk upsert load actor# " << Tag << " started on " << StartTs
+ str << "DS bulk upsert load actor# " << Id << " started on " << StartTs
<< " sent " << CurrentRequest << " out of " << Requests.size();
TInstant ts = EndTs ? EndTs : TInstant::Now();
auto delta = ts - StartTs;
@@ -290,7 +290,7 @@ private:
<< " errors=" << Errors;
}
- ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Tag, str.Str()));
+ ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str()));
}
void HandlePoison(const TActorContext& ctx) {
@@ -301,7 +301,7 @@ private:
void StopWithError(const TActorContext& ctx, const TString& reason) {
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
- ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason));
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
}
@@ -321,27 +321,30 @@ private:
IActor *CreateUpsertBulkActor(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd,
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
- const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
+ const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TSubLoadId& id)
{
- return new TUpsertActor(cmd, target, parent, std::move(counters), tag, ERequestType::UpsertBulk);
+ return new TUpsertActor(cmd, target, parent, std::move(counters), id, ERequestType::UpsertBulk);
}
IActor *CreateLocalMkqlUpsertActor(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd,
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
- const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
+ const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TSubLoadId& id)
{
- return new TUpsertActor(cmd, target, parent, std::move(counters), tag, ERequestType::UpsertLocalMkql);
+ return new TUpsertActor(cmd, target, parent, std::move(counters), id, ERequestType::UpsertLocalMkql);
}
IActor *CreateProposeUpsertActor(const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart& cmd,
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
- const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, ui64 tag)
+ const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TSubLoadId& id)
{
Y_UNUSED(cmd);
Y_UNUSED(target);
Y_UNUSED(parent);
Y_UNUSED(counters);
- Y_UNUSED(tag);
+ Y_UNUSED(id);
return nullptr; // not yet implemented
}
diff --git a/ydb/core/load_test/ycsb/kqp_upsert.cpp b/ydb/core/load_test/ycsb/kqp_upsert.cpp
index ecfd3c28a0..54f2217c9a 100644
--- a/ydb/core/load_test/ycsb/kqp_upsert.cpp
+++ b/ydb/core/load_test/ycsb/kqp_upsert.cpp
@@ -72,14 +72,13 @@ TQueryInfo GenerateUpsert(size_t n, const TString& table) {
return TQueryInfo(str.Str(), std::move(params));
}
-
// it's a partial copy-paste from TUpsertActor: logic slightly differs, so that
// it seems better to have copy-paste rather if/else for different loads
class TKqpUpsertActor : public TActorBootstrapped<TKqpUpsertActor> {
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart Config;
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target;
const TActorId Parent;
- const ui64 Tag;
+ const TSubLoadId Id;
const TString Database;
TString ConfingString;
@@ -99,12 +98,12 @@ public:
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag,
+ const TSubLoadId& id,
TRequestsVector requests)
: Config(cmd)
, Target(target)
, Parent(parent)
- , Tag(tag)
+ , Id(id)
, Database(Target.GetWorkingDir())
, Requests(std::move(requests))
{
@@ -113,7 +112,7 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
<< " Bootstrap called: " << ConfingString);
Become(&TKqpUpsertActor::StateFunc);
@@ -123,7 +122,7 @@ public:
private:
void CreateSession(const TActorContext& ctx) {
auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
<< " sends event for session creation to proxy: " << kqpProxy.ToString());
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
@@ -136,7 +135,7 @@ private:
return;
auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
<< " sends session close query to proxy: " << kqpProxy);
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
@@ -150,7 +149,7 @@ private:
request->Record.MutableRequest()->SetSessionId(Session);
auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
<< " send request# " << CurrentRequest
<< " to proxy# " << kqpProxy << ": " << request->ToString());
@@ -168,23 +167,24 @@ private:
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
- auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag);
auto& report = *response->Record.MutableReport();
- report.SetTag(Tag);
+ report.SetTag(Id.SubTag);
report.SetDurationMs(delta.MilliSeconds());
report.SetOperationsOK(Requests.size() - Errors);
report.SetOperationsError(Errors);
ctx.Send(Parent, response.release());
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
<< " finished in " << delta << ", errors=" << Errors);
Die(ctx);
}
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag << " tablet recieved PoisonPill, going to die");
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
+ << " tablet recieved PoisonPill, going to die");
CloseSession(ctx);
Die(ctx);
}
@@ -194,7 +194,7 @@ private:
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
Session = response.GetResponse().GetSessionId();
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag << " session: " << Session);
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " session: " << Session);
SendRows(ctx);
} else {
StopWithError(ctx, "failed to create session: " + ev->Get()->ToString());
@@ -202,7 +202,7 @@ private:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
<< " received from " << ev->Sender << ": " << ev->Get()->Record.DebugString());
--Inflight;
@@ -217,14 +217,14 @@ private:
void StopWithError(const TActorContext& ctx, const TString& reason) {
LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
- ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason));
Die(ctx);
}
void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) {
TStringStream str;
HTML(str) {
- str << "TKqpUpsertActor# " << Tag << " started on " << StartTs
+ str << "TKqpUpsertActor# " << Id << " started on " << StartTs
<< " sent " << CurrentRequest << " out of " << Requests.size();
TInstant ts = EndTs ? EndTs : TInstant::Now();
auto delta = ts - StartTs;
@@ -233,7 +233,7 @@ private:
<< " errors=" << Errors;
}
- ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Tag, str.Str()));
+ ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str()));
}
STRICT_STFUNC(StateFunc,
@@ -249,12 +249,13 @@ class TKqpUpsertActorMultiSession : public TActorBootstrapped<TKqpUpsertActorMul
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TUpdateStart Config;
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target;
const TActorId Parent;
- const ui64 Tag;
+ const TSubLoadId Id;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
const TString Database;
TString ConfingString;
+ ui64 LastSubTag = 0;
TVector<TActorId> Actors;
size_t Inflight = 0;
@@ -270,11 +271,11 @@ public:
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag)
+ const TSubLoadId& id)
: Config(cmd)
, Target(target)
, Parent(parent)
- , Tag(tag)
+ , Id(id)
, Counters(counters)
, Database(target.GetWorkingDir())
{
@@ -283,7 +284,7 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
<< " Bootstrap called: " << ConfingString);
Become(&TKqpUpsertActorMultiSession::StateFunc);
@@ -331,7 +332,7 @@ private:
Actors.reserve(actorsCount);
Inflight = actorsCount;
for (size_t i = 0; i < actorsCount; ++i) {
- ui32 pseudoTag = 1000000 + i;
+ TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag);
auto configCopy = Config;
configCopy.SetInflight(1); // we have only 1 session
configCopy.SetRowCount(requestsPerActor);
@@ -341,12 +342,12 @@ private:
Target,
SelfId(),
Counters,
- pseudoTag,
+ subId,
std::move(perActorRequests[i]));
Actors.emplace_back(ctx.Register(kqpActor));
}
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
<< " started# " << actorsCount << " actors each with inflight# " << requestsPerActor);
}
@@ -362,7 +363,7 @@ private:
return;
}
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Tag << " finished: " << ev->Get()->ToString());
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Id << " finished: " << ev->Get()->ToString());
Errors += record.GetReport().GetOperationsError();
Oks += record.GetReport().GetOperationsOK();
@@ -372,15 +373,15 @@ private:
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
- auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag);
auto& report = *response->Record.MutableReport();
- report.SetTag(Tag);
+ report.SetTag(Id.SubTag);
report.SetDurationMs(delta.MilliSeconds());
report.SetOperationsOK(Oks);
report.SetOperationsError(Errors);
ctx.Send(Parent, response.release());
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
<< " finished in " << delta << ", oks# " << Oks << ", errors# " << Errors);
Stop(ctx);
@@ -390,22 +391,22 @@ private:
void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) {
TStringStream str;
HTML(str) {
- str << "TKqpUpsertActorMultiSession# " << Tag << " started on " << StartTs;
+ str << "TKqpUpsertActorMultiSession# " << Id << " started on " << StartTs;
}
- ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Tag, str.Str()));
+ ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str()));
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
<< " tablet recieved PoisonPill, going to die");
Stop(ctx);
}
void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
<< " stopped with error: " << reason);
- ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason));
Stop(ctx);
}
@@ -431,9 +432,9 @@ IActor *CreateKqpUpsertActor(
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag)
+ const TSubLoadId& id)
{
- return new TKqpUpsertActorMultiSession(cmd, target, parent, std::move(counters), tag);
+ return new TKqpUpsertActorMultiSession(cmd, target, parent, std::move(counters), id);
}
} // NKikimr::NDataShardLoad
diff --git a/ydb/core/load_test/ycsb/test_load_actor.cpp b/ydb/core/load_test/ycsb/test_load_actor.cpp
index 1bc0d52491..4ae6a2994b 100644
--- a/ydb/core/load_test/ycsb/test_load_actor.cpp
+++ b/ydb/core/load_test/ycsb/test_load_actor.cpp
@@ -49,7 +49,7 @@ private:
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
EState State = EState::Init;
- ui64 LastTag;
+ ui64 LastTag = 0;
TInstant StartTs;
THashSet<TActorId> LoadActors;
@@ -74,7 +74,6 @@ public:
, Tag(tag)
, Request(std::move(request))
, Counters(counters)
- , LastTag(tag)
{}
void Bootstrap(const TActorContext& ctx) {
@@ -346,7 +345,7 @@ public:
target,
ctx.SelfID,
GetServiceCounters(Counters, "load_actor"),
- ++LastTag)));
+ TSubLoadId(Tag, ctx.SelfID, ++LastTag))));
}
void RunLoad(const TActorContext& ctx) {
@@ -362,7 +361,7 @@ public:
Request.GetTargetShard(),
ctx.SelfID,
counters,
- tag));
+ TSubLoadId(Tag, ctx.SelfID, ++LastTag)));
break;
case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kUpsertLocalMkqlStart:
actor.reset(CreateLocalMkqlUpsertActor(
@@ -370,7 +369,7 @@ public:
Request.GetTargetShard(),
ctx.SelfID,
counters,
- tag));
+ TSubLoadId(Tag, ctx.SelfID, ++LastTag)));
break;
case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kUpsertKqpStart:
actor.reset(CreateKqpUpsertActor(
@@ -378,7 +377,7 @@ public:
Request.GetTargetShard(),
ctx.SelfID,
counters,
- tag));
+ TSubLoadId(Tag, ctx.SelfID, ++LastTag)));
break;
case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kUpsertProposeStart:
actor.reset(CreateProposeUpsertActor(
@@ -386,7 +385,7 @@ public:
Request.GetTargetShard(),
ctx.SelfID,
counters,
- tag));
+ TSubLoadId(Tag, ctx.SelfID, ++LastTag)));
break;
case NKikimrDataShardLoad::TEvYCSBTestLoadRequest::CommandCase::kReadIteratorStart:
actor.reset(CreateReadIteratorActor(
@@ -394,7 +393,7 @@ public:
Request.GetTargetShard(),
ctx.SelfID,
counters,
- tag));
+ TSubLoadId(Tag, ctx.SelfID, ++LastTag)));
break;
default: {
TStringStream ss;
@@ -613,3 +612,11 @@ inline void Out<NKikimr::NDataShardLoad::TLoad::EState>(
break;
}
}
+
+template <>
+void Out<NKikimr::NDataShardLoad::TSubLoadId>(
+ IOutputStream& o,
+ const NKikimr::NDataShardLoad::TSubLoadId& loadId)
+{
+ o << "{Tag: " << loadId.Tag << ", parent: " << loadId.Parent << ", subTag: " << loadId.SubTag << "}";
+}
diff --git a/ydb/core/load_test/ycsb/test_load_actor.h b/ydb/core/load_test/ycsb/test_load_actor.h
index ff8e95e59f..556f3c5b92 100644
--- a/ydb/core/load_test/ycsb/test_load_actor.h
+++ b/ydb/core/load_test/ycsb/test_load_actor.h
@@ -95,5 +95,17 @@ IActor *CreateTestLoadActor(
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
ui64 tag);
+struct TSubLoadId {
+ ui64 Tag = 0; // tag assigned to the "main" load actor (i.e. controlling the load) by service actor
+ TActorId Parent; // parent of the subload
+ ui64 SubTag = 0;
+
+ TSubLoadId(ui64 tag, const TActorId& parent, ui64 subTag = 0)
+ : Tag(tag)
+ , Parent(parent)
+ , SubTag(subTag)
+ {}
+};
+
} // NDataShardLoad
} // NKikimr
diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
index 05ad9a63a5..0cc161deb3 100644
--- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
+++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
@@ -93,6 +93,7 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> {
const NKikimrTxDataShard::EScanDataFormat Format;
const ui64 TabletId;
const TActorId Parent;
+ const TSubLoadId Id;
TActorId Pipe;
@@ -109,12 +110,14 @@ public:
TReadIteratorPoints(TEvDataShard::TEvRead* request,
ui64 tablet,
const TActorId& parent,
+ const TSubLoadId& id,
const TVector<TOwnedCellVec>& points,
ui64 readCount)
: BaseRequest(request)
, Format(BaseRequest->Record.GetResultFormat())
, TabletId(tablet)
, Parent(parent)
+ , Id(id)
, Points(points)
, ReadCount(readCount)
{
@@ -122,8 +125,8 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId()
- << " with parent# " << Parent << " Bootstrap called, will read keys# " << Points.size());
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " Bootstrap called, will read keys# " << Points.size());
Become(&TReadIteratorPoints::StateFunc);
@@ -138,16 +141,16 @@ public:
private:
void Connect(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId()
- << " with parent# " << Parent << " Connect to# " << TabletId << " called");
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " Connect to# " << TabletId << " called");
Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId));
}
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
TEvTabletPipe::TEvClientConnected *msg = ev->Get();
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId()
- << " with parent# " << Parent << " Handle TEvClientConnected called, Status# " << msg->Status);
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " Handle TEvClientConnected called, Status# " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
TStringStream ss;
@@ -160,8 +163,8 @@ private:
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId()
- << " with parent# " << Parent << " Handle TEvClientDestroyed called");
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " Handle TEvClientDestroyed called");
return StopWithError(ctx, "broken pipe");
}
@@ -217,8 +220,8 @@ private:
}
void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId()
- << " with parent# " << Parent << ", stopped with error: " << reason);
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << ", stopped with error: " << reason);
ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason));
NTabletPipe::CloseClient(SelfId(), Pipe);
@@ -226,8 +229,8 @@ private:
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << SelfId()
- << " with parent# " << Parent << " tablet recieved PoisonPill, going to die");
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " tablet recieved PoisonPill, going to die");
// TODO: cancel iterator
return Die(ctx);
@@ -249,6 +252,7 @@ class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> {
const NKikimrTxDataShard::EScanDataFormat Format;
const ui64 TabletId;
const TActorId Parent;
+ const TSubLoadId Id;
const ui64 SampleKeyCount;
TActorId Pipe;
@@ -259,18 +263,23 @@ class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> {
TVector<TOwnedCellVec> SampledKeys;
public:
- TReadIteratorScan(TEvDataShard::TEvRead* request, ui64 tablet, const TActorId& parent, ui64 sample)
+ TReadIteratorScan(TEvDataShard::TEvRead* request,
+ ui64 tablet,
+ const TActorId& parent,
+ const TSubLoadId& id,
+ ui64 sample)
: Request(request)
, Format(Request->Record.GetResultFormat())
, TabletId(tablet)
, Parent(parent)
+ , Id(id)
, SampleKeyCount(sample)
{
}
void Bootstrap(const TActorContext& ctx) {
- LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId()
- << " with parent# " << Parent << " Bootstrap called, sample# " << SampleKeyCount);
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " Bootstrap called, sample# " << SampleKeyCount);
Become(&TReadIteratorScan::StateFunc);
Connect(ctx);
@@ -278,16 +287,16 @@ public:
private:
void Connect(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId()
- << " with parent# " << Parent << " Connect to# " << TabletId << " called");
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " Connect to# " << TabletId << " called");
Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId));
}
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
TEvTabletPipe::TEvClientConnected *msg = ev->Get();
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId()
- << " with parent# " << Parent << " Handle TEvClientConnected called, Status# " << msg->Status);
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " Handle TEvClientConnected called, Status# " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
TStringStream ss;
@@ -300,8 +309,8 @@ private:
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId()
- << " with parent# " << Parent << " Handle TEvClientDestroyed called");
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " Handle TEvClientDestroyed called");
return StopWithError(ctx, "broken pipe");
}
@@ -340,8 +349,8 @@ private:
}
if (record.GetFinished() || SampledKeys.size() >= SampleKeyCount) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId()
- << " with parent# " << Parent << " finished in " << delta
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " finished in " << delta
<< ", sampled# " << SampledKeys.size()
<< ", iter finished# " << record.GetFinished()
<< ", oks# " << Oks);
@@ -352,8 +361,8 @@ private:
return;
} else if (record.GetFinished()) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId()
- << " with parent# " << Parent << " finished in " << delta
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " finished in " << delta
<< ", read# " << Oks);
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0);
@@ -368,8 +377,8 @@ private:
}
void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId()
- << " with parent# " << Parent << ", stopped with error: " << reason);
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << ", stopped with error: " << reason);
ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason));
NTabletPipe::CloseClient(SelfId(), Pipe);
@@ -377,8 +386,8 @@ private:
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << SelfId()
- << " with parent# " << Parent << " tablet recieved PoisonPill, going to die");
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
+ << " tablet recieved PoisonPill, going to die");
// TODO: cancel iterator
return Die(ctx);
@@ -407,7 +416,7 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard Target;
const TActorId Parent;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
- const ui64 Tag;
+ const TSubLoadId Id;
// used to measure full run of this actor
TInstant StartTs;
@@ -437,6 +446,8 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce
EState State = EState::DescribePath;
ui64 Inflight = 0;
+ ui64 LastSubTag = 0;
+
// setup for fullscan
TVector<ui64> ChunkSizes = {0, 0, 1, 1, 10, 10, 100, 100, 1000, 1000}; // each twice intentionally
size_t ChunkIndex = 0;
@@ -452,12 +463,12 @@ public:
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag)
+ const TSubLoadId& id)
: Config(cmd)
, Target(target)
, Parent(parent)
, Counters(std::move(counters))
- , Tag(tag)
+ , Id(id)
, HeadReadsHist(1000, 4)
{
google::protobuf::TextFormat::PrintToString(cmd, &ConfingString);
@@ -485,7 +496,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << SelfId()
- << " with tag# " << Tag << " Bootstrap called: " << ConfingString);
+ << " with id# " << Id << " Bootstrap called: " << ConfingString);
Become(&TReadIteratorLoadScenario::StateFunc);
StartTs = TInstant::Now();
@@ -543,7 +554,7 @@ private:
AllColumnIds.push_back(column.GetId());
}
- LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
<< " will work with tablet# " << TabletId << " with ownerId# " << OwnerId
<< " with tableId# " << TableId << " resolved for path# "
<< Target.GetWorkingDir() << "/" << Target.GetTableName()
@@ -580,7 +591,8 @@ private:
record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC);
- auto* actor = new TReadIteratorScan(request.release(), TabletId, SelfId(), sampleKeys);
+ TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag);
+ auto* actor = new TReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys);
StartedActors.emplace_back(ctx.Register(actor));
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back());
@@ -651,7 +663,7 @@ private:
void Handle(TEvPrivate::TEvKeys::TPtr& ev, const TActorContext& ctx) {
Keys = std::move(ev->Get()->Keys);
- LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
<< " received keyCount# " << Keys.size());
State = EState::ReadHeadPoints;
@@ -684,16 +696,18 @@ private:
record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+ TSubLoadId subId(Id.Tag, SelfId(), ++LastSubTag);
auto* readActor = new TReadIteratorPoints(
request.release(),
TabletId,
SelfId(),
+ subId,
Keys,
ReadCount);
StartedActors.emplace_back(ctx.Register(readActor));
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
<< " started read actor with id# " << StartedActors.back());
}
@@ -701,7 +715,7 @@ private:
--Inflight;
const auto& requestTimes = ev->Get()->RequestTimes;
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
<< " received point times# " << requestTimes.size() << ", Inflight left# " << Inflight);
for (auto t: requestTimes) {
@@ -743,9 +757,9 @@ private:
auto ts = TInstant::Now();
auto delta = ts - StartTs;
- auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Id.SubTag);
auto& report = *response->Record.MutableReport();
- report.SetTag(Tag);
+ report.SetTag(Id.SubTag);
report.SetDurationMs(delta.MilliSeconds());
report.SetOperationsOK(Oks);
report.SetOperationsError(0);
@@ -758,7 +772,7 @@ private:
report.SetInfo(ss.Str());
report.SetSubtestCount(Results.size());
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
<< " finished in " << delta << " with report:\n" << report.GetInfo());
ctx.Send(Parent, response.release());
@@ -769,22 +783,22 @@ private:
void Handle(TEvDataShardLoad::TEvTestLoadInfoRequest::TPtr& ev, const TActorContext& ctx) {
TStringStream str;
HTML(str) {
- str << "ReadIteratorLoadScenario# " << Tag << " started on " << StartTs;
+ str << "ReadIteratorLoadScenario# " << Id << " started on " << StartTs;
}
- ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Tag, str.Str()));
+ ctx.Send(ev->Sender, new TEvDataShardLoad::TEvTestLoadInfoResponse(Id.SubTag, str.Str()));
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
<< " tablet recieved PoisonPill, going to die");
Stop(ctx);
}
void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
<< " stopped with error: " << reason);
- ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
+ ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason));
Stop(ctx);
}
@@ -811,9 +825,9 @@ IActor *CreateReadIteratorActor(
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TReadStart& cmd,
const NKikimrDataShardLoad::TEvYCSBTestLoadRequest::TTargetShard& target,
const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- ui64 tag)
+ const TSubLoadId& id)
{
- return new TReadIteratorLoadScenario(cmd, target, parent, std::move(counters), tag);
+ return new TReadIteratorLoadScenario(cmd, target, parent, std::move(counters), id);
}
} // NKikimr::NDataShardLoad