aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-12-08 22:21:34 +0300
committereivanov89 <eivanov89@ydb.tech>2022-12-08 22:21:34 +0300
commit35e53e3bf1d4087b8c6fccfb6286001d94b5a68d (patch)
tree81f72384fc0208c7dbf697ca88b5d1d7795ca0cf
parent96b5ab9295df718b2f695b3ee1223d407d3de565 (diff)
downloadydb-35e53e3bf1d4087b8c6fccfb6286001d94b5a68d.tar.gz
PR from branch users/eivanov89/-improve-load-actors
switch to protobuf in load finished responce init load actors using batch upsert to speedup move data upsert from read iterator to common warmup proper enum serialization
-rw-r--r--ydb/core/protos/CMakeLists.txt5
-rw-r--r--ydb/core/protos/datashard_load.proto25
-rw-r--r--ydb/core/tx/datashard/datashard_ut_testload.cpp32
-rw-r--r--ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp48
-rw-r--r--ydb/core/tx/datashard/testload/info_collector.cpp8
-rw-r--r--ydb/core/tx/datashard/testload/kqp_upsert.cpp37
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.cpp86
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.h61
-rw-r--r--ydb/core/tx/datashard/testload/test_load_read_iterator.cpp126
9 files changed, 233 insertions, 195 deletions
diff --git a/ydb/core/protos/CMakeLists.txt b/ydb/core/protos/CMakeLists.txt
index a4eebd1c08..138114d9d7 100644
--- a/ydb/core/protos/CMakeLists.txt
+++ b/ydb/core/protos/CMakeLists.txt
@@ -157,6 +157,11 @@ generate_enum_serilization(ydb-core-protos
INCLUDE_HEADERS
ydb/core/protos/blobstorage_pdisk_config.pb.h
)
+generate_enum_serilization(ydb-core-protos
+ ${CMAKE_BINARY_DIR}/ydb/core/protos/datashard_load.pb.h
+ INCLUDE_HEADERS
+ ydb/core/protos/datashard_load.pb.h
+)
target_proto_addincls(ydb-core-protos
./
${CMAKE_SOURCE_DIR}/
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
index d85bdea56d..a0f4f0ca22 100644
--- a/ydb/core/protos/datashard_load.proto
+++ b/ydb/core/protos/datashard_load.proto
@@ -27,6 +27,9 @@ message TEvTestLoadRequest {
// special mode: actor writes RowCount rows again and again
optional bool Infinite = 4;
+
+ // only for bulk upsert
+ optional uint32 BatchSize = 5;
}
message TReadStart {
@@ -90,15 +93,29 @@ message TEvTestLoadResponse {
optional uint64 Tag = 4;
}
-message TLoadInfo {
+message TLoadReport {
optional uint64 Tag = 1;
- optional string Data = 2;
+
+ optional uint64 DurationMs = 2;
+ optional uint64 OperationsOK = 3;
+ optional uint64 OperationsError = 4;
+
+ // info might contain result for multiple subtests
+ optional string Info = 5;
+ optional uint64 SubtestCount = 6;
+ optional string PrefixInfo = 7;
}
message TEvTestLoadInfoRequest {
}
-// collector puts Infos sorted by Tag
+// collector puts Reports sorted by Tag
message TEvTestLoadInfoResponse {
- repeated TLoadInfo Infos = 1;
+ repeated TLoadReport Reports = 1;
+}
+
+message TEvTestLoadFinished {
+ optional uint64 Tag = 1;
+ optional TLoadReport Report = 2;
+ optional string ErrorReason = 3;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp
index 2e04771a9b..4ee4b073c7 100644
--- a/ydb/core/tx/datashard/datashard_ut_testload.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp
@@ -241,8 +241,8 @@ struct TTestHelper {
runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadFinished>(handle);
UNIT_ASSERT(handle);
auto response = handle->Release<TEvDataShardLoad::TEvTestLoadFinished>();
- UNIT_ASSERT(response->Report);
- UNIT_ASSERT(!response->ErrorReason);
+ UNIT_ASSERT(response->Record.HasReport());
+ UNIT_ASSERT(!response->Record.HasErrorReason());
return std::unique_ptr<TEvDataShardLoad::TEvTestLoadFinished>(response.Release());
}
@@ -292,6 +292,28 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
}
+ Y_UNIT_TEST(ShouldWriteDataBulkUpsertBatch) {
+ // same as ShouldWriteDataBulkUpsert, but with batch size
+ TTestHelper helper;
+
+ const ui64 expectedRowCount = 100;
+
+ std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest());
+ auto& record = request->Record;
+ auto& command = *record.MutableUpsertBulkStart();
+
+ command.SetRowCount(expectedRowCount);
+ command.SetInflight(3);
+ command.SetBatchSize(7);
+
+ auto& target = *record.MutableTargetShard();
+ target.SetTabletId(helper.Table.TabletId);
+ target.SetTableId(helper.Table.UserTable.GetPathId());
+ target.SetTableName(DefaultTableName);
+
+ helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
+ }
+
Y_UNIT_TEST(ShouldWriteDataBulkUpsert2) {
// check nondefault tablename
TSettings settings("JustTable");
@@ -571,10 +593,10 @@ Y_UNIT_TEST_SUITE(ReadLoad) {
setupTable.SetTableName("usertable");
auto result = helper.RunTestLoad(std::move(request));
- UNIT_ASSERT(result->Report);
+ UNIT_ASSERT(result->Record.HasReport());
- UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 4);
- UNIT_ASSERT_VALUES_EQUAL(result->Report->OperationsOK, (4 * expectedRowCount));
+ UNIT_ASSERT_VALUES_EQUAL(result->Record.GetReport().GetSubtestCount(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(result->Record.GetReport().GetOperationsOK(), (4 * expectedRowCount));
// sanity check that there was data in table
helper.CheckKeys(0, expectedRowCount);
diff --git a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
index 857342722b..30a270ead5 100644
--- a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
+++ b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp
@@ -30,7 +30,7 @@ enum class ERequestType {
UpsertLocalMkql,
};
-TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyNum) {
+TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyStart, ui64 n) {
TUploadRowsRequestPtr request(new TEvDataShard::TEvUploadRowsRequest());
auto& record = request->Record;
record.SetTableId(tableId);
@@ -41,20 +41,22 @@ TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyNum) {
}
rowScheme.AddKeyColumnIds(1);
- TVector<TCell> keys;
- keys.reserve(1);
- TString key = GetKey(keyNum);
- keys.emplace_back(key.data(), key.size());
+ for (size_t i = keyStart; i < keyStart + n; ++i) {
+ TVector<TCell> keys;
+ keys.reserve(1);
+ TString key = GetKey(i);
+ keys.emplace_back(key.data(), key.size());
- TVector<TCell> values;
- values.reserve(10);
- for (size_t i = 2; i <= 11; ++i) {
- values.emplace_back(Value.data(), Value.size());
- }
+ TVector<TCell> values;
+ values.reserve(10);
+ for (size_t i = 2; i <= 11; ++i) {
+ values.emplace_back(Value.data(), Value.size());
+ }
- auto& row = *record.AddRows();
- row.SetKeyColumns(TSerializedCellVec::Serialize(keys));
- row.SetValueColumns(TSerializedCellVec::Serialize(values));
+ auto& row = *record.AddRows();
+ row.SetKeyColumns(TSerializedCellVec::Serialize(keys));
+ row.SetValueColumns(TSerializedCellVec::Serialize(values));
+ }
return TUploadRequest(request.release());
}
@@ -95,6 +97,7 @@ TRequestsVector GenerateRequests(
ui64 tableId,
ui64 keyFrom,
ui64 n,
+ ui64 batchSize, // only bulk requests
ERequestType requestType,
const TString& table)
{
@@ -103,9 +106,12 @@ TRequestsVector GenerateRequests(
for (size_t i = keyFrom; i < keyFrom + n; ++i) {
switch (requestType) {
- case ERequestType::UpsertBulk:
- requests.emplace_back(GenerateBulkRowRequest(tableId, i));
+ case ERequestType::UpsertBulk: {
+ auto keysLeft = keyFrom + n - i;
+ auto currentBatchSize = Max(Min(batchSize, keysLeft), ui64(1));
+ requests.emplace_back(GenerateBulkRowRequest(tableId, i, currentBatchSize));
break;
+ }
case ERequestType::UpsertLocalMkql:
requests.emplace_back(GenerateMkqlRowRequest(tableId, i, table));
break;
@@ -168,6 +174,7 @@ public:
Target.GetTableId(),
Config.GetKeyFrom(),
Config.GetRowCount(),
+ Config.GetBatchSize(),
RequestType,
Target.GetTableName());
@@ -221,11 +228,14 @@ private:
} else if (Inflight == 0) {
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
+
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
- response->Report = TEvDataShardLoad::TLoadReport();
- response->Report->Duration = delta;
- response->Report->OperationsOK = Requests.size() - Errors;
- response->Report->OperationsError = Errors;
+ auto& report = *response->Record.MutableReport();
+ report.SetTag(Tag);
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Requests.size() - Errors);
+ report.SetOperationsError(Errors);
+
ctx.Send(Parent, response.release());
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag
diff --git a/ydb/core/tx/datashard/testload/info_collector.cpp b/ydb/core/tx/datashard/testload/info_collector.cpp
index 3e5e03fea1..f82b9e4c34 100644
--- a/ydb/core/tx/datashard/testload/info_collector.cpp
+++ b/ydb/core/tx/datashard/testload/info_collector.cpp
@@ -11,7 +11,7 @@ class TInfoCollector : public TActorBootstrapped<TInfoCollector> {
const TActorId Parent;
TVector<TActorId> Actors;
- TMap<ui64, NKikimrDataShardLoad::TLoadInfo> Results;
+ TMap<ui64, NKikimrDataShardLoad::TLoadReport> Results;
ui64 ResponsesPending = 0;
@@ -35,8 +35,8 @@ public:
void Handle(TEvDataShardLoad::TEvTestLoadInfoResponse::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
- Y_VERIFY(record.InfosSize() == 1);
- Results[record.GetInfos(0).GetTag()] = std::move(record.GetInfos(0));
+ Y_VERIFY(record.ReportsSize() == 1);
+ Results[record.GetReports(0).GetTag()] = std::move(record.GetReports(0));
--ResponsesPending;
if (ResponsesPending == 0) {
@@ -48,7 +48,7 @@ public:
void Reply(const TActorContext& ctx) {
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadInfoResponse>();
for (auto& it: Results) {
- *response->Record.AddInfos() = std::move(it.second);
+ *response->Record.AddReports() = std::move(it.second);
}
ctx.Send(Parent, response.release());
diff --git a/ydb/core/tx/datashard/testload/kqp_upsert.cpp b/ydb/core/tx/datashard/testload/kqp_upsert.cpp
index a9a8f735bd..b8ab23c5d2 100644
--- a/ydb/core/tx/datashard/testload/kqp_upsert.cpp
+++ b/ydb/core/tx/datashard/testload/kqp_upsert.cpp
@@ -167,11 +167,14 @@ private:
} else if (Inflight == 0) {
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
+
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
- response->Report = TEvDataShardLoad::TLoadReport();
- response->Report->Duration = delta;
- response->Report->OperationsOK = Requests.size() - Errors;
- response->Report->OperationsError = Errors;
+ auto& report = *response->Record.MutableReport();
+ report.SetTag(Tag);
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Requests.size() - Errors);
+ report.SetOperationsError(Errors);
+
ctx.Send(Parent, response.release());
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag
@@ -348,31 +351,33 @@ private:
}
void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
- const auto* msg = ev->Get();
- if (msg->ErrorReason || !msg->Report) {
+ const auto& record = ev->Get()->Record;
+ if (record.HasErrorReason() || !record.HasReport()) {
TStringStream ss;
- ss << "kqp actor# " << msg->Tag << " finished with error: " << msg->ErrorReason;
- if (msg->Report)
- ss << ", report: " << msg->Report->ToString();
+ ss << "kqp actor# " << record.GetTag() << " finished with error: " << record.GetErrorReason();
+ if (record.HasReport())
+ ss << ", report: " << ev->Get()->ToString();
StopWithError(ctx, ss.Str());
return;
}
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Tag << " finished: " << msg->Report->ToString());
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Tag << " finished: " << ev->Get()->ToString());
- Errors += msg->Report->OperationsError;
- Oks += msg->Report->OperationsOK;
+ Errors += record.GetReport().GetOperationsError();
+ Oks += record.GetReport().GetOperationsOK();
--Inflight;
if (Inflight == 0) {
EndTs = TInstant::Now();
auto delta = EndTs - StartTs;
+
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
- response->Report = TEvDataShardLoad::TLoadReport();
- response->Report->Duration = delta;
- response->Report->OperationsOK = Oks;
- response->Report->OperationsError = Errors;
+ auto& report = *response->Record.MutableReport();
+ report.SetTag(Tag);
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Oks);
+ report.SetOperationsError(Errors);
ctx.Send(Parent, response.release());
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp
index 7ef8eb1025..a3ee50ba1f 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp
@@ -33,7 +33,7 @@ struct TFinishedTestInfo {
ui64 Tag;
TString ErrorReason;
TInstant FinishTime;
- std::optional<TEvDataShardLoad::TLoadReport> Report;
+ NKikimrDataShardLoad::TLoadReport Report;
};
// TLoad
@@ -333,6 +333,9 @@ public:
case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart:
cmd = Request.GetUpsertProposeStart();
break;
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kReadIteratorStart:
+ cmd.SetRowCount(Request.GetReadIteratorStart().GetRowCount());
+ break;
default:
State = EState::RunLoad;
return PrepareTable(ctx);
@@ -344,8 +347,8 @@ public:
<< " in dir# " << target.GetWorkingDir()
<< " with rows# " << cmd.GetRowCount());
- // TODO: we need bulk upsert with normal batch size, not 1 row per request
cmd.SetInflight(100);
+ cmd.SetBatchSize(100);
LoadActors.insert(ctx.Register(
CreateUpsertBulkActor(
@@ -405,7 +408,7 @@ public:
break;
default: {
TStringStream ss;
- ss << "TLoad: unexpected command case# " << ui32(Request.Command_case())
+ ss << "TLoad: unexpected command case# " << Request.Command_case()
<< ", proto# " << Request.DebugString();
StopWithError(ctx, ss.Str());
return;
@@ -413,18 +416,18 @@ public:
}
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " created load actor of type# "
- << ui32(Request.Command_case())<< " with tag# " << tag << ", proto# " << Request.DebugString());
+ << Request.Command_case() << " with tag# " << tag << ", proto# " << Request.DebugString());
LoadActors.insert(ctx.Register(actor.release()));
}
void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
- const auto& msg = ev->Get();
+ const auto& record = ev->Get()->Record;
LoadActors.erase(ev->Sender);
- if (msg->ErrorReason || !msg->Report) {
+ if (record.HasErrorReason() || !record.HasReport()) {
TStringStream ss;
- ss << "error from actor# " << ev->Sender << " with tag# " << msg->Tag;
+ ss << "error from actor# " << ev->Sender << " with tag# " << record.GetTag();
StopWithError(ctx, ss.Str());
return;
}
@@ -435,9 +438,9 @@ public:
}
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
- << " received finished from actor# " << ev->Sender << " with tag# " << msg->Tag);
+ << " received finished from actor# " << ev->Sender << " with tag# " << record.GetTag());
- FinishedTests.push_back({msg->Tag, msg->ErrorReason, TAppData::TimeProvider->Now(), msg->Report});
+ FinishedTests.push_back({record.GetTag(), record.GetErrorReason(), TAppData::TimeProvider->Now(), record.GetReport()});
if (LoadActors.empty()) {
Finish(ctx);
@@ -448,21 +451,27 @@ public:
void Finish(const TActorContext& ctx) {
auto endTs = TAppData::TimeProvider->Now();
- auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>();
- response->Tag = Tag;
- response->Report = TEvDataShardLoad::TLoadReport();
- response->Report->Duration = endTs - StartTs;
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
+ auto& report = *response->Record.MutableReport();
+ report.SetTag(Tag);
+ report.SetDurationMs((endTs - StartTs).MilliSeconds());
+ ui64 oks = 0;
+ ui64 errors = 0;
+ ui64 subtestCount = 0;
TStringStream ss;
for (const auto& test: FinishedTests) {
- Y_VERIFY(test.Report);
- response->Report->OperationsOK += test.Report->OperationsOK;
- response->Report->OperationsError += test.Report->OperationsError;
- response->Report->SubtestCount += test.Report->SubtestCount;
- if (test.Report->Info)
- ss << test.Report->Info << Endl;
+ oks += test.Report.GetOperationsOK();
+ errors += test.Report.GetOperationsError();
+ subtestCount += test.Report.GetSubtestCount();
+ if (test.Report.HasInfo())
+ ss << test.Report.GetInfo() << Endl;
}
+ report.SetOperationsOK(oks);
+ report.SetOperationsError(errors);
+ report.SetSubtestCount(subtestCount);
+
ctx.Send(Parent, response.release());
Die(ctx);
}
@@ -475,9 +484,9 @@ public:
<< ", in state# " << State;
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadInfoResponse>();
- auto* info = response->Record.AddInfos();
+ auto* info = response->Record.AddReports();
info->SetTag(Tag);
- info->SetData(ss.Str());
+ info->SetInfo(ss.Str());
ctx.Send(ev->Sender, response.release());
return;
}
@@ -501,14 +510,14 @@ public:
<< ", finished# " << FinishedTests.size()
<< ", subactors infos: ";
- for (auto& info: ev->Get()->Record.GetInfos()) {
- ss << "{ tag: " << info.GetTag() << ", data: " << info.GetData() << " }";
+ for (auto& info: ev->Get()->Record.GetReports()) {
+ ss << "{ tag: " << info.GetTag() << ", info: " << info.GetInfo() << " }";
}
NKikimrDataShardLoad::TEvTestLoadInfoResponse record;
- auto* info = record.AddInfos();
- info->SetTag(Tag);
- info->SetData(ss.Str());
+ auto* report = record.AddReports();
+ report->SetTag(Tag);
+ report->SetInfo(ss.Str());
for (const auto& actorId: HttpInfoWaiters) {
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadInfoResponse>();
@@ -674,23 +683,23 @@ public:
}
void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
- const auto& msg = ev->Get();
- auto it = LoadActors.find(msg->Tag);
- Y_VERIFY(it != LoadActors.end(), "%s", (TStringBuilder() << "failed to find actor with tag# " << msg->Tag
+ const auto& record = ev->Get()->Record;
+ auto it = LoadActors.find(record.GetTag());
+
+ Y_VERIFY(it != LoadActors.end(), "%s", (TStringBuilder() << "failed to find actor with tag# " << record.GetTag()
<< ", TEvTestLoadFinished from actor# " << ev->Sender).c_str());
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load actor# " << ev->Sender
- << " with tag# " << msg->Tag << " finished");
+ << " with tag# " << record.GetTag() << " finished");
if (it->second.Parent) {
- auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>();
- response->Tag = ev->Get()->Tag;
- response->ErrorReason = ev->Get()->ErrorReason;
- response->Report = ev->Get()->Report;
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(record.GetTag());
+ response->Record = record;
ctx.Send(it->second.Parent, response.release());
}
LoadActors.erase(it);
- FinishedTests.push_back({msg->Tag, msg->ErrorReason, TAppData::TimeProvider->Now(), msg->Report});
+ FinishedTests.push_back(
+ {record.GetTag(), record.GetErrorReason(), TAppData::TimeProvider->Now(), record.GetReport()});
}
void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
@@ -727,13 +736,13 @@ public:
}
}
- for (const auto& info: record.GetInfos()) {
+ for (const auto& info: record.GetReports()) {
DIV_CLASS("panel panel-info") {
DIV_CLASS("panel-heading") {
str << "Tag# " << info.GetTag();
}
DIV_CLASS("panel-body") {
- str << info.GetData();
+ str << info.GetInfo();
}
}
}
@@ -746,8 +755,7 @@ public:
}
DIV_CLASS("panel-body") {
str << "<p>";
- if (req.Report)
- str << "Report# " << req.Report->ToString() << "<br/>";
+ str << "Report# " << req.Report << "<br/>";
str << "Finish reason# " << req.ErrorReason << "<br/>";
str << "Finish time# " << req.FinishTime << "<br/>";
str << "</p>";
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h
index 269fb22864..f6ac06af3e 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.h
+++ b/ydb/core/tx/datashard/testload/test_load_actor.h
@@ -36,49 +36,32 @@ struct TEvDataShardLoad {
TEvTestLoadResponse() = default;
};
- struct TLoadReport {
- TDuration Duration;
- ui64 OperationsOK = 0;
- ui64 OperationsError = 0;
+ struct TEvTestLoadFinished
+ : public TEventPB<TEvTestLoadFinished,
+ NKikimrDataShardLoad::TEvTestLoadFinished,
+ EvTestLoadFinished> {
- // info might contain result for multiple subtests
- TString Info;
- ui64 SubtestCount = 0;
+ TEvTestLoadFinished() = default;
- // used by test launchers to specify params and test number
- TString PrefixInfo;
+ TEvTestLoadFinished(ui64 tag, const TString& error = {})
+ {
+ Record.SetTag(tag);
+ if (error)
+ Record.SetErrorReason(error);
+ }
TString ToString() const {
TStringStream ss;
- if (PrefixInfo)
- ss << PrefixInfo << ". ";
-
- ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError;
- if (OperationsOK && Duration.Seconds()) {
- ui64 throughput = OperationsOK / Duration.Seconds();
- ss << ", throughput=" << throughput << " OK_ops/s";
+ ss << Record.GetTag();
+ if (Record.HasErrorReason()) {
+ ss << " failed: " << Record.GetErrorReason();
+ } else {
+ const auto& report = Record.GetReport();
+ ss << " " << report;
}
- if (SubtestCount) {
- ss << ", subtests: " << SubtestCount;
- }
- if (Info) {
- ss << ", Info: " << Info;
- }
- return ss.Str();
- }
- };
-
- struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, EvTestLoadFinished> {
- ui64 Tag;
- std::optional<TLoadReport> Report;
- TString ErrorReason;
-
- TEvTestLoadFinished() = default;
- TEvTestLoadFinished(ui64 tag, const TString& error = {})
- : Tag(tag)
- , ErrorReason(error)
- {}
+ return ss.Str();
+ }
};
struct TEvTestLoadInfoRequest
@@ -97,9 +80,9 @@ struct TEvDataShardLoad {
TEvTestLoadInfoResponse() = default;
TEvTestLoadInfoResponse(ui64 tag, const TString& data) {
- auto* info = Record.AddInfos();
- info->SetTag(tag);
- info->SetData(std::move(data));
+ auto* report = Record.AddReports();
+ report->SetTag(tag);
+ report->SetInfo(std::move(data));
}
};
};
diff --git a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
index 63136c080a..46da2960aa 100644
--- a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
@@ -357,10 +357,10 @@ private:
<< ", read# " << Oks);
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0);
- response->Report = TEvDataShardLoad::TLoadReport();
- response->Report->Duration = delta;
- response->Report->OperationsOK = Oks;
- response->Report->OperationsError = 0;
+ auto& report = *response->Record.MutableReport();
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Oks);
+ report.SetOperationsError(0);
ctx.Send(Parent, response.release());
return Die(ctx);
@@ -397,7 +397,6 @@ private:
enum class EState {
DescribePath,
- Upsert,
FullScan,
FullScanGetKeys,
ReadHeadPoints,
@@ -429,7 +428,7 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce
size_t Oks = 0;
size_t Errors = 0;
- TVector<TEvDataShardLoad::TLoadReport> Results;
+ TVector<NKikimrDataShardLoad::TLoadReport> Results;
// accumulates results from read actors: between different inflights/chunks must be reset
NHdr::THistogram HeadReadsHist;
@@ -499,9 +498,6 @@ private:
case EState::DescribePath:
DescribePath(ctx);
return;
- case EState::Upsert:
- UpsertData(ctx);
- return;
case EState::FullScan:
RunFullScan(ctx, 0);
break;
@@ -553,32 +549,10 @@ private:
<< Target.GetWorkingDir() << "/" << Target.GetTableName()
<< " with columnsCount# " << AllColumnIds.size() << ", keyColumnCount# " << KeyColumnIds.size());
- State = EState::Upsert;
+ State = EState::FullScan;
Run(ctx);
}
- void UpsertData(const TActorContext& ctx) {
- NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart upsertConfig;
- upsertConfig.SetRowCount(Config.GetRowCount());
- upsertConfig.SetInflight(100); // some good value to upsert fast
-
- NKikimrDataShardLoad::TEvTestLoadRequest::TTargetShard target;
- target.SetTabletId(TabletId);
- target.SetTableId(TableId);
-
- auto* upsertActor = CreateUpsertBulkActor(
- upsertConfig,
- target,
- SelfId(),
- Counters,
- /* meaningless tag */ 1000000);
-
- StartedActors.emplace_back(ctx.Register(upsertActor));
-
- LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
- << " started upsert actor with id# " << StartedActors.back());
- }
-
void RunFullScan(const TActorContext& ctx, ui64 sampleKeys) {
auto request = std::make_unique<TEvDataShard::TEvRead>();
auto& record = request->Record;
@@ -613,31 +587,25 @@ private:
}
void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
- const auto* msg = ev->Get();
- if (msg->ErrorReason || !msg->Report) {
+ const auto& record = ev->Get()->Record;
+ if (record.HasErrorReason() || !record.HasReport()) {
TStringStream ss;
- ss << "read iterator actor# " << msg->Tag << " finished with error: " << msg->ErrorReason
+ ss << "read iterator actor# " << record.GetTag() << " finished with error: " << record.GetErrorReason()
<< " in State# " << (int)State;
- if (msg->Report)
- ss << ", report: " << msg->Report->ToString();
+ if (record.HasReport())
+ ss << ", report: " << ev->Get()->ToString();
return StopWithError(ctx, ss.Str());
}
switch (State) {
- case EState::Upsert: {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "upsert actor# " << ev->Sender
- << " finished: " << msg->Report->ToString());
- State = EState::FullScan;
- return Run(ctx);
- }
case EState::FullScan: {
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "fullscan actor# " << ev->Sender
<< " with chunkSize# " << ChunkSizes[ChunkIndex]
- << " finished: " << msg->Report->ToString());
- Errors += msg->Report->OperationsError;
- Oks += msg->Report->OperationsOK;
- Results.emplace_back(*msg->Report);
+ << " finished: " << ev->Get()->ToString());
+ Errors += record.GetReport().GetOperationsError();
+ Oks += record.GetReport().GetOperationsOK();
+ Results.emplace_back(record.GetReport());
auto& lastResult = Results.back();
TStringStream ss;
@@ -647,7 +615,7 @@ private:
} else {
ss << "inf";
}
- lastResult.PrefixInfo = ss.Str();
+ lastResult.SetPrefixInfo(ss.Str());
++ChunkIndex;
if (ChunkIndex == ChunkSizes.size())
@@ -660,16 +628,16 @@ private:
case EState::ReadHeadPoints: {
Y_VERIFY(Inflight == 0);
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "headread with inflight# " << Inflights[InflightIndex]
- << " finished: " << msg->Report->ToString());
- Errors += msg->Report->OperationsError;
- Oks += msg->Report->OperationsOK;
- Results.emplace_back(*msg->Report);
+ << " finished: " << ev->Get()->ToString());
+ Errors += record.GetReport().GetOperationsError();
+ Oks += record.GetReport().GetOperationsOK();
+ Results.emplace_back(record.GetReport());
auto& lastResult = Results.back();
TStringStream ss;
ss << "Test run# " << Results.size() << ", type# ReadHeadPoints with inflight# "
<< Inflights[InflightIndex];
- lastResult.PrefixInfo = ss.Str();
+ lastResult.SetPrefixInfo(ss.Str());
++InflightIndex;
if (InflightIndex == Inflights.size())
@@ -748,10 +716,10 @@ private:
auto delta = ts - StartTsSubTest;
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0);
- response->Report = TEvDataShardLoad::TLoadReport();
- response->Report->Duration = delta;
- response->Report->OperationsOK = Inflights[InflightIndex] * ReadCount;
- response->Report->OperationsError = 0;
+ auto& report = *response->Record.MutableReport();
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Inflights[InflightIndex] * ReadCount);
+ report.SetOperationsError(0);
TStringStream ss;
i64 v50 = HeadReadsHist.GetValueAtPercentile(50.0);
@@ -766,7 +734,7 @@ private:
<< "\n99.9%: " << v999
<< Endl;
- response->Report->Info = ss.Str();
+ report.SetInfo(ss.Str());
ctx.Send(SelfId(), response.release());
}
}
@@ -776,21 +744,22 @@ private:
auto delta = ts - StartTs;
auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag);
- response->Report = TEvDataShardLoad::TLoadReport();
- response->Report->Duration = delta;
- response->Report->OperationsOK = Oks;
- response->Report->OperationsError = 0;
+ auto& report = *response->Record.MutableReport();
+ report.SetTag(Tag);
+ report.SetDurationMs(delta.MilliSeconds());
+ report.SetOperationsOK(Oks);
+ report.SetOperationsError(0);
TStringStream ss;
for (const auto& report: Results) {
- ss << report.ToString() << Endl;
+ ss << report << Endl;
}
- response->Report->Info = ss.Str();
- response->Report->SubtestCount = Results.size();
+ report.SetInfo(ss.Str());
+ report.SetSubtestCount(Results.size());
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
- << " finished in " << delta << " with report:\n" << response->Report->Info);
+ << " finished in " << delta << " with report:\n" << report.GetInfo());
ctx.Send(Parent, response.release());
@@ -855,9 +824,6 @@ inline void Out<NKikimr::NDataShardLoad::EState>(IOutputStream& o, NKikimr::NDat
case NKikimr::NDataShardLoad::EState::DescribePath:
o << "describepath";
break;
- case NKikimr::NDataShardLoad::EState::Upsert:
- o << "upsert";
- break;
case NKikimr::NDataShardLoad::EState::FullScan:
o << "fullscan";
break;
@@ -872,3 +838,25 @@ inline void Out<NKikimr::NDataShardLoad::EState>(IOutputStream& o, NKikimr::NDat
break;
}
}
+
+template <>
+inline void Out<NKikimrDataShardLoad::TLoadReport>(IOutputStream& o, const NKikimrDataShardLoad::TLoadReport& report) {
+ if (report.HasPrefixInfo())
+ o << report.GetPrefixInfo() << ". ";
+
+ auto duration = TDuration::MilliSeconds(report.GetDurationMs());
+ o << "Load duration: " << duration
+ << ", OK=" << report.GetOperationsOK()
+ << ", Error=" << report.GetOperationsError();
+
+ if (report.GetOperationsOK() && duration.Seconds()) {
+ ui64 throughput = report.GetOperationsOK() / duration.Seconds();
+ o << ", throughput=" << throughput << " OK_ops/s";
+ }
+ if (report.HasSubtestCount()) {
+ o << ", subtests: " << report.GetSubtestCount();
+ }
+ if (report.HasInfo()) {
+ o << ", Info: " << report.GetInfo();
+ }
+}