aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-16 13:58:57 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-16 13:58:57 +0300
commit5fbc1694e011960d1cf103b82034a0ff61f0d6ce (patch)
tree1545a7f42a6a88901befeca959c48a2b44a7ae78
parent353bf4878abf72bf834becde2eca008a84ea1855 (diff)
downloadydb-5fbc1694e011960d1cf103b82034a0ff61f0d6ce.tar.gz
notify load starter, when load finished. Tags cleanup
-rw-r--r--ydb/core/protos/datashard_load.proto34
-rw-r--r--ydb/core/tx/datashard/datashard_ut_testload.cpp71
-rw-r--r--ydb/core/tx/datashard/testload/actors.h2
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.cpp248
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.h2
-rw-r--r--ydb/core/tx/datashard/testload/test_load_upsert.cpp1
6 files changed, 196 insertions, 162 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
index 6ed5f767525..5700539266b 100644
--- a/ydb/core/protos/datashard_load.proto
+++ b/ydb/core/protos/datashard_load.proto
@@ -12,35 +12,36 @@ message TEvTestLoadRequest {
}
message TUpdateStart {
- optional uint64 Tag = 1;
- optional uint64 RowCount = 2;
- optional uint64 TabletId = 3;
- optional uint64 TableId = 4;
- optional uint32 Inflight = 5;
+ optional uint64 RowCount = 1;
+ optional uint64 TabletId = 2;
+ optional uint64 TableId = 3;
+ optional uint32 Inflight = 4;
// in some cases we need full DB path with table
optional string Path = 6;
}
message TReadStart {
- optional uint64 Tag = 1;
- optional uint64 TabletId = 2;
- optional uint64 TableId = 3;
+ optional uint64 TabletId = 1;
+ optional uint64 TableId = 2;
// Specifies the format for result data in TEvReadResult
- optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 4;
+ optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 3;
}
optional uint64 Cookie = 1;
+ // normally should be used by ut only
+ optional bool NotifyWhenFinished = 2;
+
oneof Command {
- TLoadStop LoadStop = 2;
- TUpdateStart UpsertBulkStart = 3;
- TUpdateStart UpsertLocalMkqlStart = 4;
- TUpdateStart UpsertKqpStart = 5;
- TUpdateStart UpsertProposeStart = 6;
+ TLoadStop LoadStop = 20;
+ TUpdateStart UpsertBulkStart = 21;
+ TUpdateStart UpsertLocalMkqlStart = 22;
+ TUpdateStart UpsertKqpStart = 23;
+ TUpdateStart UpsertProposeStart = 24;
- TReadStart ReadIteratorStart = 7;
+ TReadStart ReadIteratorStart = 25;
}
}
@@ -48,4 +49,7 @@ message TEvTestLoadResponse {
optional uint32 Status = 1; // EResponseStatus from ydb/core/client/base/msgbus.h
optional string ErrorReason = 2;
optional uint64 Cookie = 3;
+
+ // if load acter has been started, then it is the assigned tag
+ optional uint64 Tag = 4;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp
index 671e1df1474..a13da3e9595 100644
--- a/ydb/core/tx/datashard/datashard_ut_testload.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp
@@ -159,42 +159,54 @@ struct TTestHelper {
return WaitReadResult();
}
- void TestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request, size_t expectedRowCount) {
+ void RunTestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request) {
+ request->Record.SetNotifyWhenFinished(true);
auto &runtime = *Server->GetRuntime();
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters());
auto testLoadActor = runtime.Register(CreateTestLoadActor(counters));
runtime.Send(new IEventHandle(testLoadActor, Sender, request.release()), 0, true);
- TAutoPtr<IEventHandle> handle;
- runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadResponse>(handle);
- UNIT_ASSERT(handle);
- auto response = handle->Release<TEvDataShardLoad::TEvTestLoadResponse>();
- auto& responseRecord = response->Record;
- UNIT_ASSERT_VALUES_EQUAL(responseRecord.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ {
+ // check load started
+ TAutoPtr<IEventHandle> handle;
+ runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadResponse>(handle);
+ UNIT_ASSERT(handle);
+ auto response = handle->Release<TEvDataShardLoad::TEvTestLoadResponse>();
+ auto& responseRecord = response->Record;
+ UNIT_ASSERT_VALUES_EQUAL(responseRecord.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ }
+
+ {
+ // wait until load finished
+ TAutoPtr<IEventHandle> handle;
+ runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadFinished>(handle);
+ UNIT_ASSERT(handle);
+ auto response = handle->Release<TEvDataShardLoad::TEvTestLoadFinished>();
+ UNIT_ASSERT(response->Report);
+ UNIT_ASSERT(!response->ErrorReason);
+ }
+ }
+
+ void RunUpsertTestLoad(std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> loadRequest, size_t expectedRowCount) {
+ RunTestLoad(std::move(loadRequest));
// holds memory for TCell
TVector<TString> from = {TString("user")};
TVector<TString> to = {TString("zzz")};
- // busywait
- while (1) {
- auto request = GetBaseReadRequest();
- AddRangeQuery(
- *request,
- from,
- true,
- to,
- true
- );
-
- auto readResult = SendRead(request.release());
- UNIT_ASSERT(readResult);
- if (readResult->GetRowsCount() == expectedRowCount)
- break;
-
- SimulateSleep(Server, TDuration::Seconds(1));
- }
+ auto request = GetBaseReadRequest();
+ AddRangeQuery(
+ *request,
+ from,
+ true,
+ to,
+ true
+ );
+
+ auto readResult = SendRead(request.release());
+ UNIT_ASSERT(readResult);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), expectedRowCount);
}
public:
@@ -217,13 +229,12 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
auto& record = request->Record;
auto& command = *record.MutableUpsertBulkStart();
- command.SetTag(1);
command.SetRowCount(expectedRowCount);
command.SetTabletId(helper.Table.TabletId);
command.SetTableId(helper.Table.UserTable.GetPathId());
command.SetInflight(3);
- helper.TestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
}
Y_UNIT_TEST(ShouldWriteDataBulkUpsertLocalMkql) {
@@ -235,13 +246,12 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
auto& record = request->Record;
auto& command = *record.MutableUpsertLocalMkqlStart();
- command.SetTag(1);
command.SetRowCount(expectedRowCount);
command.SetTabletId(helper.Table.TabletId);
command.SetTableId(helper.Table.UserTable.GetPathId());
command.SetInflight(3);
- helper.TestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
}
Y_UNIT_TEST(ShouldWriteKqpUpsert) {
@@ -253,14 +263,13 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
auto& record = request->Record;
auto& command = *record.MutableUpsertKqpStart();
- command.SetTag(1);
command.SetRowCount(expectedRowCount);
command.SetTabletId(helper.Table.TabletId);
command.SetTableId(helper.Table.UserTable.GetPathId());
command.SetInflight(5);
command.SetPath("/Root");
- helper.TestLoad(std::move(request), expectedRowCount);
+ helper.RunUpsertTestLoad(std::move(request), expectedRowCount);
}
} // Y_UNIT_TEST_SUITE(UpsertLoad)
diff --git a/ydb/core/tx/datashard/testload/actors.h b/ydb/core/tx/datashard/testload/actors.h
index 13dff18d8b5..6fe5d648914 100644
--- a/ydb/core/tx/datashard/testload/actors.h
+++ b/ydb/core/tx/datashard/testload/actors.h
@@ -32,6 +32,4 @@ class TLoadActorException : public yexception {
} \
} while (false)
-#define VERIFY_PARAM(NAME) VERIFY_PARAM2(cmd, NAME)
-
} // NKikimr::NDataShardLoad
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp
index 07194b91aa8..cec2b3674c9 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp
@@ -18,6 +18,17 @@ class TLoadActor : public TActorBootstrapped<TLoadActor> {
TString Data; // HTML response
};
+ struct TRunningActorInfo {
+ TActorId ActorId;
+ TActorId Parent; // if set we notify parent when actor finishes
+
+ explicit TRunningActorInfo(const TActorId& actorId, const TActorId& parent = {})
+ : ActorId(actorId)
+ , Parent(parent)
+ {
+ }
+ };
+
// per-request info
struct THttpInfoRequest {
TActorId Origin; // who asked for status
@@ -38,7 +49,8 @@ class TLoadActor : public TActorBootstrapped<TLoadActor> {
TVector<TFinishedTestInfo> FinishedTests;
// currently running load actors
- TMap<ui64, TActorId> LoadActors;
+ TMap<ui64, TRunningActorInfo> LoadActors;
+ ui64 LastTag = 0; // tags start from 1
// next HTTP request identifier
ui32 NextRequestId;
@@ -63,11 +75,12 @@ public:
}
void Handle(TEvDataShardLoad::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
ui32 status = NMsgBusProxy::MSTATUS_OK;
TString error;
- const auto& record = ev->Get()->Record;
+ ui64 tag = 0;
try {
- ProcessCmd(record, ctx);
+ tag = ProcessCmd(ev, ctx);
} catch (const TLoadActorException& ex) {
LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "Exception while creating load actor, what# "
<< ex.what());
@@ -82,116 +95,134 @@ public:
if (record.HasCookie()) {
response->Record.SetCookie(record.GetCookie());
}
+ if (tag) {
+ response->Record.SetTag(tag);
+ }
ctx.Send(ev->Sender, response.release());
}
- template<typename T>
- ui64 GetOrGenerateTag(const T& cmd) {
- if (cmd.HasTag()) {
- return cmd.GetTag();
- } else {
- if (LoadActors.empty()) {
- return 1;
- } else {
- return LoadActors.rbegin()->first + 1;
- }
+ ui64 GetTag() {
+ auto tag = ++LastTag;
+
+ // just sanity check
+ if (LoadActors.contains(tag)) {
+ ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
}
+
+ return tag;
}
- void ProcessCmd(const NKikimrDataShardLoad::TEvTestLoadRequest& record, const TActorContext& ctx) {
+ ui64 ProcessCmd(TEvDataShardLoad::TEvTestLoadRequest::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
switch (record.Command_case()) {
- case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertBulkStart: {
- const auto& cmd = record.GetUpsertBulkStart();
- const ui64 tag = GetOrGenerateTag(cmd);
- if (LoadActors.count(tag) != 0) {
- ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
- }
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new bulk upsert load actor with tag# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateUpsertBulkActor(cmd, ctx.SelfID,
- GetServiceCounters(Counters, "load_actor"), tag)));
- break;
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertBulkStart: {
+ const auto& cmd = record.GetUpsertBulkStart();
+ const ui64 tag = GetTag();
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new bulk upsert load actor with tag# " << tag);
+
+ auto* actor = CreateUpsertBulkActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag);
+ TRunningActorInfo actorInfo(ctx.Register(actor));
+ if (record.GetNotifyWhenFinished()) {
+ actorInfo.Parent = ev->Sender;
}
+ LoadActors.emplace(tag, std::move(actorInfo));
+ return tag;
+ }
- case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: {
- const auto& cmd = record.GetUpsertLocalMkqlStart();
- const ui64 tag = GetOrGenerateTag(cmd);
- if (LoadActors.count(tag) != 0) {
- ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
- }
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new local mkql upsert load actor with tag# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateLocalMkqlUpsertActor(cmd, ctx.SelfID,
- GetServiceCounters(Counters, "load_actor"), tag)));
- break;
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: {
+ const auto& cmd = record.GetUpsertLocalMkqlStart();
+ const ui64 tag = GetTag();
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new local mkql upsert load actor with tag# " << tag);
+
+ auto* actor = CreateLocalMkqlUpsertActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag);
+ TRunningActorInfo actorInfo(ctx.Register(actor));
+ if (record.GetNotifyWhenFinished()) {
+ actorInfo.Parent = ev->Sender;
}
+ LoadActors.emplace(tag, std::move(actorInfo));
- case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: {
- const auto& cmd = record.GetUpsertKqpStart();
- const ui64 tag = GetOrGenerateTag(cmd);
- if (LoadActors.count(tag) != 0) {
- ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
- }
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new kqp upsert load actor with tag# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateKqpUpsertActor(cmd, ctx.SelfID,
- GetServiceCounters(Counters, "load_actor"), tag)));
- break;
+ return tag;
+ }
+
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: {
+ const auto& cmd = record.GetUpsertKqpStart();
+ const ui64 tag = GetTag();
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new kqp upsert load actor with tag# " << tag);
+
+ auto* actor = CreateKqpUpsertActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag);
+ TRunningActorInfo actorInfo(ctx.Register(actor));
+ if (record.GetNotifyWhenFinished()) {
+ actorInfo.Parent = ev->Sender;
}
+ LoadActors.emplace(tag, std::move(actorInfo));
- case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart: {
- const auto& cmd = record.GetUpsertProposeStart();
- const ui64 tag = GetOrGenerateTag(cmd);
- if (LoadActors.count(tag) != 0) {
- ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
- }
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new upsert load actor with tag# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateProposeUpsertActor(cmd, ctx.SelfID,
- GetServiceCounters(Counters, "load_actor"), tag)));
- break;
+ return tag;
+ }
+
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart: {
+ const auto& cmd = record.GetUpsertProposeStart();
+ const ui64 tag = GetTag();
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new upsert load actor with tag# " << tag);
+
+ auto* actor = CreateProposeUpsertActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag);
+ TRunningActorInfo actorInfo(ctx.Register(actor));
+ if (record.GetNotifyWhenFinished()) {
+ actorInfo.Parent = ev->Sender;
}
+ LoadActors.emplace(tag, std::move(actorInfo));
- case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kReadIteratorStart: {
- const auto& cmd = record.GetReadIteratorStart();
- const ui64 tag = GetOrGenerateTag(cmd);
- if (LoadActors.count(tag) != 0) {
- ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
- }
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new read iterator load actor# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateReadIteratorActor(cmd, ctx.SelfID,
- GetServiceCounters(Counters, "load_actor"), tag)));
- break;
+ return tag;
+ }
+
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kReadIteratorStart: {
+ const auto& cmd = record.GetReadIteratorStart();
+ const ui64 tag = GetTag();
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Create new read iterator load actor# " << tag);
+
+ auto* actor = CreateReadIteratorActor(cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), tag);
+ TRunningActorInfo actorInfo(ctx.Register(actor));
+ if (record.GetNotifyWhenFinished()) {
+ actorInfo.Parent = ev->Sender;
}
+ LoadActors.emplace(tag, std::move(actorInfo));
- case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kLoadStop: {
- const auto& cmd = record.GetLoadStop();
- if (cmd.HasRemoveAllTags() && cmd.GetRemoveAllTags()) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete all running load actors");
- for (auto& actorPair : LoadActors) {
- ctx.Send(actorPair.second, new TEvents::TEvPoisonPill);
- }
- LoadActors.clear();
- } else {
- VERIFY_PARAM(Tag);
- const ui64 tag = cmd.GetTag();
- auto it = LoadActors.find(tag);
- if (it == LoadActors.end()) {
- ythrow TLoadActorException()
- << Sprintf("load actor with Tag# %" PRIu64 " not found", tag);
- }
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete running load actor with tag# "
- << tag);
- ctx.Send(it->second, new TEvents::TEvPoisonPill);
- LoadActors.erase(it);
+ return tag;
+ }
+
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kLoadStop: {
+ const auto& cmd = record.GetLoadStop();
+ if (cmd.HasRemoveAllTags() && cmd.GetRemoveAllTags()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete all running load actors");
+ for (auto& actorPair : LoadActors) {
+ ctx.Send(actorPair.second.ActorId, new TEvents::TEvPoisonPill);
+ }
+ LoadActors.clear();
+ } else {
+ if (!cmd.HasTag()) {
+ ythrow TLoadActorException() << "Either RemoveAllTags or Tag must present";
}
- break;
+ const ui64 tag = cmd.GetTag();
+ auto it = LoadActors.find(tag);
+ if (it == LoadActors.end()) {
+ ythrow TLoadActorException()
+ << Sprintf("load actor with Tag# %" PRIu64 " not found", tag);
+ }
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Delete running load actor# " << tag);
+ ctx.Send(it->second.ActorId, new TEvents::TEvPoisonPill);
+ LoadActors.erase(it);
}
- default: {
- TString protoTxt;
- google::protobuf::TextFormat::PrintToString(record, &protoTxt);
- ythrow TLoadActorException() << (TStringBuilder()
- << "TLoadActor::Handle(TEvDataShardLoad::TEvTestLoadRequest): unexpected command case: "
- << ui32(record.Command_case())
- << " protoTxt# " << protoTxt.Quote());
- }
+ return 0;
+ }
+
+ default: {
+ TString protoTxt;
+ google::protobuf::TextFormat::PrintToString(record, &protoTxt);
+ ythrow TLoadActorException() << (TStringBuilder()
+ << "TLoadActor::Handle(TEvDataShardLoad::TEvTestLoadRequest): unexpected command case: "
+ << ui32(record.Command_case())
+ << " protoTxt# " << protoTxt.Quote());
+ }
}
}
@@ -200,6 +231,15 @@ public:
auto it = LoadActors.find(msg->Tag);
Y_VERIFY(it != LoadActors.end());
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load actor with tag# " << msg->Tag << " finished");
+
+ if (it->second.Parent) {
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>();
+ response->Tag = ev->Get()->Tag;
+ response->ErrorReason = ev->Get()->ErrorReason;
+ response->Report = ev->Get()->Report;
+ ctx.Send(it->second.Parent, response.release());
+ }
+
LoadActors.erase(it);
FinishedTests.push_back({msg->Tag, msg->ErrorReason, TAppData::TimeProvider->Now(), msg->Report});
@@ -235,28 +275,10 @@ public:
info.ErrorMessage.clear();
- const auto& params = ev->Get()->Request.GetParams();
- if (params.Has("protobuf")) {
- NKikimrDataShardLoad::TEvTestLoadRequest record;
- bool status = google::protobuf::TextFormat::ParseFromString(params.Get("protobuf"), &record);
- if (status) {
- try {
- ProcessCmd(record, ctx);
- } catch (const TLoadActorException& ex) {
- info.ErrorMessage = ex.what();
- }
- } else {
- info.ErrorMessage = "bad protobuf";
- }
-
- GenerateHttpInfoRes(ctx, id, true);
- return;
- }
-
// send messages to subactors
for (const auto& kv : LoadActors) {
- ctx.Send(kv.second, new NMon::TEvHttpInfo(ev->Get()->Request, id));
- info.ActorMap[kv.second].Tag = kv.first;
+ ctx.Send(kv.second.ActorId, new NMon::TEvHttpInfo(ev->Get()->Request, id));
+ info.ActorMap[kv.second.ActorId].Tag = kv.first;
}
// record number of responses pending
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h
index acb6d7cdeac..b4a0549c19f 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.h
+++ b/ydb/core/tx/datashard/testload/test_load_actor.h
@@ -54,6 +54,8 @@ struct TEvDataShardLoad {
std::optional<TLoadReport> Report;
TString ErrorReason;
+ TEvTestLoadFinished() = default;
+
TEvTestLoadFinished(ui64 tag, const TString& error = {})
: Tag(tag)
, ErrorReason(error)
diff --git a/ydb/core/tx/datashard/testload/test_load_upsert.cpp b/ydb/core/tx/datashard/testload/test_load_upsert.cpp
index e13a219c647..cdc2b29b9c5 100644
--- a/ydb/core/tx/datashard/testload/test_load_upsert.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_upsert.cpp
@@ -439,7 +439,6 @@ private:
auto configCopy = Config;
configCopy.SetInflight(1); // we have only 1 session
configCopy.SetRowCount(requestsPerActor);
- configCopy.SetTag(pseudoTag);
auto* kqpActor = new TKqpUpsertActor(
configCopy,