diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-12-08 22:21:34 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-12-08 22:21:34 +0300 |
commit | 35e53e3bf1d4087b8c6fccfb6286001d94b5a68d (patch) | |
tree | 81f72384fc0208c7dbf697ca88b5d1d7795ca0cf | |
parent | 96b5ab9295df718b2f695b3ee1223d407d3de565 (diff) | |
download | ydb-35e53e3bf1d4087b8c6fccfb6286001d94b5a68d.tar.gz |
PR from branch users/eivanov89/-improve-load-actors
switch to protobuf in load finished responce
init load actors using batch upsert to speedup
move data upsert from read iterator to common warmup
proper enum serialization
-rw-r--r-- | ydb/core/protos/CMakeLists.txt | 5 | ||||
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 25 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 32 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp | 48 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/info_collector.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/kqp_upsert.cpp | 37 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.cpp | 86 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.h | 61 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_read_iterator.cpp | 126 |
9 files changed, 233 insertions, 195 deletions
diff --git a/ydb/core/protos/CMakeLists.txt b/ydb/core/protos/CMakeLists.txt index a4eebd1c08..138114d9d7 100644 --- a/ydb/core/protos/CMakeLists.txt +++ b/ydb/core/protos/CMakeLists.txt @@ -157,6 +157,11 @@ generate_enum_serilization(ydb-core-protos INCLUDE_HEADERS ydb/core/protos/blobstorage_pdisk_config.pb.h ) +generate_enum_serilization(ydb-core-protos + ${CMAKE_BINARY_DIR}/ydb/core/protos/datashard_load.pb.h + INCLUDE_HEADERS + ydb/core/protos/datashard_load.pb.h +) target_proto_addincls(ydb-core-protos ./ ${CMAKE_SOURCE_DIR}/ diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index d85bdea56d..a0f4f0ca22 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -27,6 +27,9 @@ message TEvTestLoadRequest { // special mode: actor writes RowCount rows again and again optional bool Infinite = 4; + + // only for bulk upsert + optional uint32 BatchSize = 5; } message TReadStart { @@ -90,15 +93,29 @@ message TEvTestLoadResponse { optional uint64 Tag = 4; } -message TLoadInfo { +message TLoadReport { optional uint64 Tag = 1; - optional string Data = 2; + + optional uint64 DurationMs = 2; + optional uint64 OperationsOK = 3; + optional uint64 OperationsError = 4; + + // info might contain result for multiple subtests + optional string Info = 5; + optional uint64 SubtestCount = 6; + optional string PrefixInfo = 7; } message TEvTestLoadInfoRequest { } -// collector puts Infos sorted by Tag +// collector puts Reports sorted by Tag message TEvTestLoadInfoResponse { - repeated TLoadInfo Infos = 1; + repeated TLoadReport Reports = 1; +} + +message TEvTestLoadFinished { + optional uint64 Tag = 1; + optional TLoadReport Report = 2; + optional string ErrorReason = 3; } diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp index 2e04771a9b..4ee4b073c7 100644 --- a/ydb/core/tx/datashard/datashard_ut_testload.cpp +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -241,8 +241,8 @@ struct TTestHelper { runtime.GrabEdgeEventRethrow<TEvDataShardLoad::TEvTestLoadFinished>(handle); UNIT_ASSERT(handle); auto response = handle->Release<TEvDataShardLoad::TEvTestLoadFinished>(); - UNIT_ASSERT(response->Report); - UNIT_ASSERT(!response->ErrorReason); + UNIT_ASSERT(response->Record.HasReport()); + UNIT_ASSERT(!response->Record.HasErrorReason()); return std::unique_ptr<TEvDataShardLoad::TEvTestLoadFinished>(response.Release()); } @@ -292,6 +292,28 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); } + Y_UNIT_TEST(ShouldWriteDataBulkUpsertBatch) { + // same as ShouldWriteDataBulkUpsert, but with batch size + TTestHelper helper; + + const ui64 expectedRowCount = 100; + + std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); + auto& record = request->Record; + auto& command = *record.MutableUpsertBulkStart(); + + command.SetRowCount(expectedRowCount); + command.SetInflight(3); + command.SetBatchSize(7); + + auto& target = *record.MutableTargetShard(); + target.SetTabletId(helper.Table.TabletId); + target.SetTableId(helper.Table.UserTable.GetPathId()); + target.SetTableName(DefaultTableName); + + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); + } + Y_UNIT_TEST(ShouldWriteDataBulkUpsert2) { // check nondefault tablename TSettings settings("JustTable"); @@ -571,10 +593,10 @@ Y_UNIT_TEST_SUITE(ReadLoad) { setupTable.SetTableName("usertable"); auto result = helper.RunTestLoad(std::move(request)); - UNIT_ASSERT(result->Report); + UNIT_ASSERT(result->Record.HasReport()); - UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 4); - UNIT_ASSERT_VALUES_EQUAL(result->Report->OperationsOK, (4 * expectedRowCount)); + UNIT_ASSERT_VALUES_EQUAL(result->Record.GetReport().GetSubtestCount(), 4); + UNIT_ASSERT_VALUES_EQUAL(result->Record.GetReport().GetOperationsOK(), (4 * expectedRowCount)); // sanity check that there was data in table helper.CheckKeys(0, expectedRowCount); diff --git a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp index 857342722b..30a270ead5 100644 --- a/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp +++ b/ydb/core/tx/datashard/testload/bulk_mkql_upsert.cpp @@ -30,7 +30,7 @@ enum class ERequestType { UpsertLocalMkql, }; -TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyNum) { +TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyStart, ui64 n) { TUploadRowsRequestPtr request(new TEvDataShard::TEvUploadRowsRequest()); auto& record = request->Record; record.SetTableId(tableId); @@ -41,20 +41,22 @@ TUploadRequest GenerateBulkRowRequest(ui64 tableId, ui64 keyNum) { } rowScheme.AddKeyColumnIds(1); - TVector<TCell> keys; - keys.reserve(1); - TString key = GetKey(keyNum); - keys.emplace_back(key.data(), key.size()); + for (size_t i = keyStart; i < keyStart + n; ++i) { + TVector<TCell> keys; + keys.reserve(1); + TString key = GetKey(i); + keys.emplace_back(key.data(), key.size()); - TVector<TCell> values; - values.reserve(10); - for (size_t i = 2; i <= 11; ++i) { - values.emplace_back(Value.data(), Value.size()); - } + TVector<TCell> values; + values.reserve(10); + for (size_t i = 2; i <= 11; ++i) { + values.emplace_back(Value.data(), Value.size()); + } - auto& row = *record.AddRows(); - row.SetKeyColumns(TSerializedCellVec::Serialize(keys)); - row.SetValueColumns(TSerializedCellVec::Serialize(values)); + auto& row = *record.AddRows(); + row.SetKeyColumns(TSerializedCellVec::Serialize(keys)); + row.SetValueColumns(TSerializedCellVec::Serialize(values)); + } return TUploadRequest(request.release()); } @@ -95,6 +97,7 @@ TRequestsVector GenerateRequests( ui64 tableId, ui64 keyFrom, ui64 n, + ui64 batchSize, // only bulk requests ERequestType requestType, const TString& table) { @@ -103,9 +106,12 @@ TRequestsVector GenerateRequests( for (size_t i = keyFrom; i < keyFrom + n; ++i) { switch (requestType) { - case ERequestType::UpsertBulk: - requests.emplace_back(GenerateBulkRowRequest(tableId, i)); + case ERequestType::UpsertBulk: { + auto keysLeft = keyFrom + n - i; + auto currentBatchSize = Max(Min(batchSize, keysLeft), ui64(1)); + requests.emplace_back(GenerateBulkRowRequest(tableId, i, currentBatchSize)); break; + } case ERequestType::UpsertLocalMkql: requests.emplace_back(GenerateMkqlRowRequest(tableId, i, table)); break; @@ -168,6 +174,7 @@ public: Target.GetTableId(), Config.GetKeyFrom(), Config.GetRowCount(), + Config.GetBatchSize(), RequestType, Target.GetTableName()); @@ -221,11 +228,14 @@ private: } else if (Inflight == 0) { EndTs = TInstant::Now(); auto delta = EndTs - StartTs; + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); - response->Report = TEvDataShardLoad::TLoadReport(); - response->Report->Duration = delta; - response->Report->OperationsOK = Requests.size() - Errors; - response->Report->OperationsError = Errors; + auto& report = *response->Record.MutableReport(); + report.SetTag(Tag); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Requests.size() - Errors); + report.SetOperationsError(Errors); + ctx.Send(Parent, response.release()); LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Tag# " << Tag diff --git a/ydb/core/tx/datashard/testload/info_collector.cpp b/ydb/core/tx/datashard/testload/info_collector.cpp index 3e5e03fea1..f82b9e4c34 100644 --- a/ydb/core/tx/datashard/testload/info_collector.cpp +++ b/ydb/core/tx/datashard/testload/info_collector.cpp @@ -11,7 +11,7 @@ class TInfoCollector : public TActorBootstrapped<TInfoCollector> { const TActorId Parent; TVector<TActorId> Actors; - TMap<ui64, NKikimrDataShardLoad::TLoadInfo> Results; + TMap<ui64, NKikimrDataShardLoad::TLoadReport> Results; ui64 ResponsesPending = 0; @@ -35,8 +35,8 @@ public: void Handle(TEvDataShardLoad::TEvTestLoadInfoResponse::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; - Y_VERIFY(record.InfosSize() == 1); - Results[record.GetInfos(0).GetTag()] = std::move(record.GetInfos(0)); + Y_VERIFY(record.ReportsSize() == 1); + Results[record.GetReports(0).GetTag()] = std::move(record.GetReports(0)); --ResponsesPending; if (ResponsesPending == 0) { @@ -48,7 +48,7 @@ public: void Reply(const TActorContext& ctx) { auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadInfoResponse>(); for (auto& it: Results) { - *response->Record.AddInfos() = std::move(it.second); + *response->Record.AddReports() = std::move(it.second); } ctx.Send(Parent, response.release()); diff --git a/ydb/core/tx/datashard/testload/kqp_upsert.cpp b/ydb/core/tx/datashard/testload/kqp_upsert.cpp index a9a8f735bd..b8ab23c5d2 100644 --- a/ydb/core/tx/datashard/testload/kqp_upsert.cpp +++ b/ydb/core/tx/datashard/testload/kqp_upsert.cpp @@ -167,11 +167,14 @@ private: } else if (Inflight == 0) { EndTs = TInstant::Now(); auto delta = EndTs - StartTs; + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); - response->Report = TEvDataShardLoad::TLoadReport(); - response->Report->Duration = delta; - response->Report->OperationsOK = Requests.size() - Errors; - response->Report->OperationsError = Errors; + auto& report = *response->Record.MutableReport(); + report.SetTag(Tag); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Requests.size() - Errors); + report.SetOperationsError(Errors); + ctx.Send(Parent, response.release()); LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Tag @@ -348,31 +351,33 @@ private: } void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { - const auto* msg = ev->Get(); - if (msg->ErrorReason || !msg->Report) { + const auto& record = ev->Get()->Record; + if (record.HasErrorReason() || !record.HasReport()) { TStringStream ss; - ss << "kqp actor# " << msg->Tag << " finished with error: " << msg->ErrorReason; - if (msg->Report) - ss << ", report: " << msg->Report->ToString(); + ss << "kqp actor# " << record.GetTag() << " finished with error: " << record.GetErrorReason(); + if (record.HasReport()) + ss << ", report: " << ev->Get()->ToString(); StopWithError(ctx, ss.Str()); return; } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Tag << " finished: " << msg->Report->ToString()); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Tag << " finished: " << ev->Get()->ToString()); - Errors += msg->Report->OperationsError; - Oks += msg->Report->OperationsOK; + Errors += record.GetReport().GetOperationsError(); + Oks += record.GetReport().GetOperationsOK(); --Inflight; if (Inflight == 0) { EndTs = TInstant::Now(); auto delta = EndTs - StartTs; + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); - response->Report = TEvDataShardLoad::TLoadReport(); - response->Report->Duration = delta; - response->Report->OperationsOK = Oks; - response->Report->OperationsError = Errors; + auto& report = *response->Record.MutableReport(); + report.SetTag(Tag); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Oks); + report.SetOperationsError(Errors); ctx.Send(Parent, response.release()); LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Tag diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp index 7ef8eb1025..a3ee50ba1f 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.cpp +++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp @@ -33,7 +33,7 @@ struct TFinishedTestInfo { ui64 Tag; TString ErrorReason; TInstant FinishTime; - std::optional<TEvDataShardLoad::TLoadReport> Report; + NKikimrDataShardLoad::TLoadReport Report; }; // TLoad @@ -333,6 +333,9 @@ public: case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart: cmd = Request.GetUpsertProposeStart(); break; + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kReadIteratorStart: + cmd.SetRowCount(Request.GetReadIteratorStart().GetRowCount()); + break; default: State = EState::RunLoad; return PrepareTable(ctx); @@ -344,8 +347,8 @@ public: << " in dir# " << target.GetWorkingDir() << " with rows# " << cmd.GetRowCount()); - // TODO: we need bulk upsert with normal batch size, not 1 row per request cmd.SetInflight(100); + cmd.SetBatchSize(100); LoadActors.insert(ctx.Register( CreateUpsertBulkActor( @@ -405,7 +408,7 @@ public: break; default: { TStringStream ss; - ss << "TLoad: unexpected command case# " << ui32(Request.Command_case()) + ss << "TLoad: unexpected command case# " << Request.Command_case() << ", proto# " << Request.DebugString(); StopWithError(ctx, ss.Str()); return; @@ -413,18 +416,18 @@ public: } LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " created load actor of type# " - << ui32(Request.Command_case())<< " with tag# " << tag << ", proto# " << Request.DebugString()); + << Request.Command_case() << " with tag# " << tag << ", proto# " << Request.DebugString()); LoadActors.insert(ctx.Register(actor.release())); } void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { - const auto& msg = ev->Get(); + const auto& record = ev->Get()->Record; LoadActors.erase(ev->Sender); - if (msg->ErrorReason || !msg->Report) { + if (record.HasErrorReason() || !record.HasReport()) { TStringStream ss; - ss << "error from actor# " << ev->Sender << " with tag# " << msg->Tag; + ss << "error from actor# " << ev->Sender << " with tag# " << record.GetTag(); StopWithError(ctx, ss.Str()); return; } @@ -435,9 +438,9 @@ public: } LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag - << " received finished from actor# " << ev->Sender << " with tag# " << msg->Tag); + << " received finished from actor# " << ev->Sender << " with tag# " << record.GetTag()); - FinishedTests.push_back({msg->Tag, msg->ErrorReason, TAppData::TimeProvider->Now(), msg->Report}); + FinishedTests.push_back({record.GetTag(), record.GetErrorReason(), TAppData::TimeProvider->Now(), record.GetReport()}); if (LoadActors.empty()) { Finish(ctx); @@ -448,21 +451,27 @@ public: void Finish(const TActorContext& ctx) { auto endTs = TAppData::TimeProvider->Now(); - auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(); - response->Tag = Tag; - response->Report = TEvDataShardLoad::TLoadReport(); - response->Report->Duration = endTs - StartTs; + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); + auto& report = *response->Record.MutableReport(); + report.SetTag(Tag); + report.SetDurationMs((endTs - StartTs).MilliSeconds()); + ui64 oks = 0; + ui64 errors = 0; + ui64 subtestCount = 0; TStringStream ss; for (const auto& test: FinishedTests) { - Y_VERIFY(test.Report); - response->Report->OperationsOK += test.Report->OperationsOK; - response->Report->OperationsError += test.Report->OperationsError; - response->Report->SubtestCount += test.Report->SubtestCount; - if (test.Report->Info) - ss << test.Report->Info << Endl; + oks += test.Report.GetOperationsOK(); + errors += test.Report.GetOperationsError(); + subtestCount += test.Report.GetSubtestCount(); + if (test.Report.HasInfo()) + ss << test.Report.GetInfo() << Endl; } + report.SetOperationsOK(oks); + report.SetOperationsError(errors); + report.SetSubtestCount(subtestCount); + ctx.Send(Parent, response.release()); Die(ctx); } @@ -475,9 +484,9 @@ public: << ", in state# " << State; auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadInfoResponse>(); - auto* info = response->Record.AddInfos(); + auto* info = response->Record.AddReports(); info->SetTag(Tag); - info->SetData(ss.Str()); + info->SetInfo(ss.Str()); ctx.Send(ev->Sender, response.release()); return; } @@ -501,14 +510,14 @@ public: << ", finished# " << FinishedTests.size() << ", subactors infos: "; - for (auto& info: ev->Get()->Record.GetInfos()) { - ss << "{ tag: " << info.GetTag() << ", data: " << info.GetData() << " }"; + for (auto& info: ev->Get()->Record.GetReports()) { + ss << "{ tag: " << info.GetTag() << ", info: " << info.GetInfo() << " }"; } NKikimrDataShardLoad::TEvTestLoadInfoResponse record; - auto* info = record.AddInfos(); - info->SetTag(Tag); - info->SetData(ss.Str()); + auto* report = record.AddReports(); + report->SetTag(Tag); + report->SetInfo(ss.Str()); for (const auto& actorId: HttpInfoWaiters) { auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadInfoResponse>(); @@ -674,23 +683,23 @@ public: } void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { - const auto& msg = ev->Get(); - auto it = LoadActors.find(msg->Tag); - Y_VERIFY(it != LoadActors.end(), "%s", (TStringBuilder() << "failed to find actor with tag# " << msg->Tag + const auto& record = ev->Get()->Record; + auto it = LoadActors.find(record.GetTag()); + + Y_VERIFY(it != LoadActors.end(), "%s", (TStringBuilder() << "failed to find actor with tag# " << record.GetTag() << ", TEvTestLoadFinished from actor# " << ev->Sender).c_str()); LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load actor# " << ev->Sender - << " with tag# " << msg->Tag << " finished"); + << " with tag# " << record.GetTag() << " finished"); if (it->second.Parent) { - auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(); - response->Tag = ev->Get()->Tag; - response->ErrorReason = ev->Get()->ErrorReason; - response->Report = ev->Get()->Report; + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(record.GetTag()); + response->Record = record; ctx.Send(it->second.Parent, response.release()); } LoadActors.erase(it); - FinishedTests.push_back({msg->Tag, msg->ErrorReason, TAppData::TimeProvider->Now(), msg->Report}); + FinishedTests.push_back( + {record.GetTag(), record.GetErrorReason(), TAppData::TimeProvider->Now(), record.GetReport()}); } void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) { @@ -727,13 +736,13 @@ public: } } - for (const auto& info: record.GetInfos()) { + for (const auto& info: record.GetReports()) { DIV_CLASS("panel panel-info") { DIV_CLASS("panel-heading") { str << "Tag# " << info.GetTag(); } DIV_CLASS("panel-body") { - str << info.GetData(); + str << info.GetInfo(); } } } @@ -746,8 +755,7 @@ public: } DIV_CLASS("panel-body") { str << "<p>"; - if (req.Report) - str << "Report# " << req.Report->ToString() << "<br/>"; + str << "Report# " << req.Report << "<br/>"; str << "Finish reason# " << req.ErrorReason << "<br/>"; str << "Finish time# " << req.FinishTime << "<br/>"; str << "</p>"; diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h index 269fb22864..f6ac06af3e 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.h +++ b/ydb/core/tx/datashard/testload/test_load_actor.h @@ -36,49 +36,32 @@ struct TEvDataShardLoad { TEvTestLoadResponse() = default; }; - struct TLoadReport { - TDuration Duration; - ui64 OperationsOK = 0; - ui64 OperationsError = 0; + struct TEvTestLoadFinished + : public TEventPB<TEvTestLoadFinished, + NKikimrDataShardLoad::TEvTestLoadFinished, + EvTestLoadFinished> { - // info might contain result for multiple subtests - TString Info; - ui64 SubtestCount = 0; + TEvTestLoadFinished() = default; - // used by test launchers to specify params and test number - TString PrefixInfo; + TEvTestLoadFinished(ui64 tag, const TString& error = {}) + { + Record.SetTag(tag); + if (error) + Record.SetErrorReason(error); + } TString ToString() const { TStringStream ss; - if (PrefixInfo) - ss << PrefixInfo << ". "; - - ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError; - if (OperationsOK && Duration.Seconds()) { - ui64 throughput = OperationsOK / Duration.Seconds(); - ss << ", throughput=" << throughput << " OK_ops/s"; + ss << Record.GetTag(); + if (Record.HasErrorReason()) { + ss << " failed: " << Record.GetErrorReason(); + } else { + const auto& report = Record.GetReport(); + ss << " " << report; } - if (SubtestCount) { - ss << ", subtests: " << SubtestCount; - } - if (Info) { - ss << ", Info: " << Info; - } - return ss.Str(); - } - }; - - struct TEvTestLoadFinished : public TEventLocal<TEvTestLoadFinished, EvTestLoadFinished> { - ui64 Tag; - std::optional<TLoadReport> Report; - TString ErrorReason; - - TEvTestLoadFinished() = default; - TEvTestLoadFinished(ui64 tag, const TString& error = {}) - : Tag(tag) - , ErrorReason(error) - {} + return ss.Str(); + } }; struct TEvTestLoadInfoRequest @@ -97,9 +80,9 @@ struct TEvDataShardLoad { TEvTestLoadInfoResponse() = default; TEvTestLoadInfoResponse(ui64 tag, const TString& data) { - auto* info = Record.AddInfos(); - info->SetTag(tag); - info->SetData(std::move(data)); + auto* report = Record.AddReports(); + report->SetTag(tag); + report->SetInfo(std::move(data)); } }; }; diff --git a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp index 63136c080a..46da2960aa 100644 --- a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp +++ b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp @@ -357,10 +357,10 @@ private: << ", read# " << Oks); auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0); - response->Report = TEvDataShardLoad::TLoadReport(); - response->Report->Duration = delta; - response->Report->OperationsOK = Oks; - response->Report->OperationsError = 0; + auto& report = *response->Record.MutableReport(); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Oks); + report.SetOperationsError(0); ctx.Send(Parent, response.release()); return Die(ctx); @@ -397,7 +397,6 @@ private: enum class EState { DescribePath, - Upsert, FullScan, FullScanGetKeys, ReadHeadPoints, @@ -429,7 +428,7 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce size_t Oks = 0; size_t Errors = 0; - TVector<TEvDataShardLoad::TLoadReport> Results; + TVector<NKikimrDataShardLoad::TLoadReport> Results; // accumulates results from read actors: between different inflights/chunks must be reset NHdr::THistogram HeadReadsHist; @@ -499,9 +498,6 @@ private: case EState::DescribePath: DescribePath(ctx); return; - case EState::Upsert: - UpsertData(ctx); - return; case EState::FullScan: RunFullScan(ctx, 0); break; @@ -553,32 +549,10 @@ private: << Target.GetWorkingDir() << "/" << Target.GetTableName() << " with columnsCount# " << AllColumnIds.size() << ", keyColumnCount# " << KeyColumnIds.size()); - State = EState::Upsert; + State = EState::FullScan; Run(ctx); } - void UpsertData(const TActorContext& ctx) { - NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart upsertConfig; - upsertConfig.SetRowCount(Config.GetRowCount()); - upsertConfig.SetInflight(100); // some good value to upsert fast - - NKikimrDataShardLoad::TEvTestLoadRequest::TTargetShard target; - target.SetTabletId(TabletId); - target.SetTableId(TableId); - - auto* upsertActor = CreateUpsertBulkActor( - upsertConfig, - target, - SelfId(), - Counters, - /* meaningless tag */ 1000000); - - StartedActors.emplace_back(ctx.Register(upsertActor)); - - LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag - << " started upsert actor with id# " << StartedActors.back()); - } - void RunFullScan(const TActorContext& ctx, ui64 sampleKeys) { auto request = std::make_unique<TEvDataShard::TEvRead>(); auto& record = request->Record; @@ -613,31 +587,25 @@ private: } void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { - const auto* msg = ev->Get(); - if (msg->ErrorReason || !msg->Report) { + const auto& record = ev->Get()->Record; + if (record.HasErrorReason() || !record.HasReport()) { TStringStream ss; - ss << "read iterator actor# " << msg->Tag << " finished with error: " << msg->ErrorReason + ss << "read iterator actor# " << record.GetTag() << " finished with error: " << record.GetErrorReason() << " in State# " << (int)State; - if (msg->Report) - ss << ", report: " << msg->Report->ToString(); + if (record.HasReport()) + ss << ", report: " << ev->Get()->ToString(); return StopWithError(ctx, ss.Str()); } switch (State) { - case EState::Upsert: { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "upsert actor# " << ev->Sender - << " finished: " << msg->Report->ToString()); - State = EState::FullScan; - return Run(ctx); - } case EState::FullScan: { LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "fullscan actor# " << ev->Sender << " with chunkSize# " << ChunkSizes[ChunkIndex] - << " finished: " << msg->Report->ToString()); - Errors += msg->Report->OperationsError; - Oks += msg->Report->OperationsOK; - Results.emplace_back(*msg->Report); + << " finished: " << ev->Get()->ToString()); + Errors += record.GetReport().GetOperationsError(); + Oks += record.GetReport().GetOperationsOK(); + Results.emplace_back(record.GetReport()); auto& lastResult = Results.back(); TStringStream ss; @@ -647,7 +615,7 @@ private: } else { ss << "inf"; } - lastResult.PrefixInfo = ss.Str(); + lastResult.SetPrefixInfo(ss.Str()); ++ChunkIndex; if (ChunkIndex == ChunkSizes.size()) @@ -660,16 +628,16 @@ private: case EState::ReadHeadPoints: { Y_VERIFY(Inflight == 0); LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "headread with inflight# " << Inflights[InflightIndex] - << " finished: " << msg->Report->ToString()); - Errors += msg->Report->OperationsError; - Oks += msg->Report->OperationsOK; - Results.emplace_back(*msg->Report); + << " finished: " << ev->Get()->ToString()); + Errors += record.GetReport().GetOperationsError(); + Oks += record.GetReport().GetOperationsOK(); + Results.emplace_back(record.GetReport()); auto& lastResult = Results.back(); TStringStream ss; ss << "Test run# " << Results.size() << ", type# ReadHeadPoints with inflight# " << Inflights[InflightIndex]; - lastResult.PrefixInfo = ss.Str(); + lastResult.SetPrefixInfo(ss.Str()); ++InflightIndex; if (InflightIndex == Inflights.size()) @@ -748,10 +716,10 @@ private: auto delta = ts - StartTsSubTest; auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0); - response->Report = TEvDataShardLoad::TLoadReport(); - response->Report->Duration = delta; - response->Report->OperationsOK = Inflights[InflightIndex] * ReadCount; - response->Report->OperationsError = 0; + auto& report = *response->Record.MutableReport(); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Inflights[InflightIndex] * ReadCount); + report.SetOperationsError(0); TStringStream ss; i64 v50 = HeadReadsHist.GetValueAtPercentile(50.0); @@ -766,7 +734,7 @@ private: << "\n99.9%: " << v999 << Endl; - response->Report->Info = ss.Str(); + report.SetInfo(ss.Str()); ctx.Send(SelfId(), response.release()); } } @@ -776,21 +744,22 @@ private: auto delta = ts - StartTs; auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(Tag); - response->Report = TEvDataShardLoad::TLoadReport(); - response->Report->Duration = delta; - response->Report->OperationsOK = Oks; - response->Report->OperationsError = 0; + auto& report = *response->Record.MutableReport(); + report.SetTag(Tag); + report.SetDurationMs(delta.MilliSeconds()); + report.SetOperationsOK(Oks); + report.SetOperationsError(0); TStringStream ss; for (const auto& report: Results) { - ss << report.ToString() << Endl; + ss << report << Endl; } - response->Report->Info = ss.Str(); - response->Report->SubtestCount = Results.size(); + report.SetInfo(ss.Str()); + report.SetSubtestCount(Results.size()); LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag - << " finished in " << delta << " with report:\n" << response->Report->Info); + << " finished in " << delta << " with report:\n" << report.GetInfo()); ctx.Send(Parent, response.release()); @@ -855,9 +824,6 @@ inline void Out<NKikimr::NDataShardLoad::EState>(IOutputStream& o, NKikimr::NDat case NKikimr::NDataShardLoad::EState::DescribePath: o << "describepath"; break; - case NKikimr::NDataShardLoad::EState::Upsert: - o << "upsert"; - break; case NKikimr::NDataShardLoad::EState::FullScan: o << "fullscan"; break; @@ -872,3 +838,25 @@ inline void Out<NKikimr::NDataShardLoad::EState>(IOutputStream& o, NKikimr::NDat break; } } + +template <> +inline void Out<NKikimrDataShardLoad::TLoadReport>(IOutputStream& o, const NKikimrDataShardLoad::TLoadReport& report) { + if (report.HasPrefixInfo()) + o << report.GetPrefixInfo() << ". "; + + auto duration = TDuration::MilliSeconds(report.GetDurationMs()); + o << "Load duration: " << duration + << ", OK=" << report.GetOperationsOK() + << ", Error=" << report.GetOperationsError(); + + if (report.GetOperationsOK() && duration.Seconds()) { + ui64 throughput = report.GetOperationsOK() / duration.Seconds(); + o << ", throughput=" << throughput << " OK_ops/s"; + } + if (report.HasSubtestCount()) { + o << ", subtests: " << report.GetSubtestCount(); + } + if (report.HasInfo()) { + o << ", Info: " << report.GetInfo(); + } +} |