diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-26 21:21:16 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-26 21:21:16 +0300 |
commit | 141fe23a8ace89f84fd7b4c7aac8cca332608c3a (patch) | |
tree | a2c2b5db0562e5372662f6486f594679cd37ffaf | |
parent | 7881b02bb3ef2f12d7abc3c8ff505ac451e6190c (diff) | |
download | ydb-141fe23a8ace89f84fd7b4c7aac8cca332608c3a.tar.gz |
run read iterator load with different inflights
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_read_iterator.cpp | 132 |
4 files changed, 108 insertions, 36 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index edb71cf82d0..e88e6e76096 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -27,8 +27,10 @@ message TEvTestLoadRequest { // defines dataset size, normally must be withing 2 GiB optional uint64 RowCount = 2; + repeated uint32 Inflights = 3; + // Specifies the format for result data in TEvReadResult - optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 3; + optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 4; } optional uint64 Cookie = 1; diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp index eebc1f4dec9..27e95c72836 100644 --- a/ydb/core/tx/datashard/datashard_ut_testload.cpp +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -298,7 +298,7 @@ Y_UNIT_TEST_SUITE(ReadLoad) { // fullscans with different chunks: 5 // read head with inflight 1 - UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 6); + UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 12); // sanity check that there was data in table helper.CheckKeysCount(expectedRowCount); diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h index 053fe433d7d..98e21d4b99b 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.h +++ b/ydb/core/tx/datashard/testload/test_load_actor.h @@ -42,8 +42,14 @@ struct TEvDataShardLoad { TString Info; ui64 SubtestCount = 0; + // used by test launchers to specify params and test number + TString PrefixInfo; + TString ToString() const { TStringStream ss; + if (PrefixInfo) + ss << PrefixInfo << ". "; + ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError; if (OperationsOK && Duration.Seconds()) { ui64 throughput = OperationsOK / Duration.Seconds(); diff --git a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp index 11752bd8d3b..30b86bc9ef6 100644 --- a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp +++ b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp @@ -86,6 +86,8 @@ void AddKeyQuery( request.Keys.emplace_back(buf); } +// TReadIteratorPoints + class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { const std::unique_ptr<const TEvDataShard::TEvRead> BaseRequest; const NKikimrTxDataShard::EScanDataFormat Format; @@ -233,6 +235,8 @@ private: ) }; +// TReadIteratorScan + class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> { std::unique_ptr<TEvDataShard::TEvRead> Request; const NKikimrTxDataShard::EScanDataFormat Format; @@ -379,6 +383,8 @@ private: ) }; +// TReadIteratorLoadScenario + enum class EState { DescribePath, Upsert, @@ -393,6 +399,7 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; const ui64 Tag; + // used to measure full run of this actor TInstant StartTs; TString ConfingString; @@ -412,16 +419,20 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce size_t Oks = 0; size_t Errors = 0; TVector<TEvDataShardLoad::TLoadReport> Results; + + // accumulates results from read actors: between different inflights/chunks must be reset NHdr::THistogram HeadReadsHist; + TInstant StartTsSubTest; EState State = EState::DescribePath; + ui64 Inflight = 0; // setup for fullscan - const TVector<ui64> ChunkSizes = {0, 1, 10, 100, 1000}; + const TVector<ui64> ChunkSizes = {0, 0, 1, 1, 10, 10, 100, 100, 1000, 1000}; // each twice intentionally size_t ChunkIndex = 0; - // setup for all load tests - const TVector<ui64> Inflights = {1, 2, 10, 50, 100, 200, 400 }; + // note that might be overwritten by test incoming test config + TVector<ui64> Inflights = {1, 2, 10, 50, 100, 200, 400}; size_t InflightIndex = 0; public: @@ -434,6 +445,13 @@ public: , HeadReadsHist(1000, 4) { google::protobuf::TextFormat::PrintToString(cmd, &ConfingString); + + if (Config.InflightsSize()) { + Inflights.clear(); + for (auto inflight: Config.GetInflights()) { + Inflights.push_back(inflight); + } + } } void Bootstrap(const TActorContext& ctx) { @@ -441,15 +459,11 @@ public: << " with tag# " << Tag << " Bootstrap called: " << ConfingString); Become(&TReadIteratorLoadScenario::StateFunc); + StartTs = TInstant::Now(); Run(ctx); } private: - void ResetIndices() { - ChunkIndex = 0; - InflightIndex = 0; - } - void Run(const TActorContext& ctx) { switch (State) { case EState::DescribePath: @@ -575,8 +589,7 @@ private: LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "upsert actor# " << ev->Sender << " finished: " << msg->Report->ToString()); State = EState::FullScan; - Run(ctx); - return; + return Run(ctx); } case EState::FullScan: { LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "fullscan actor# " << ev->Sender @@ -586,17 +599,44 @@ private: Oks += msg->Report->OperationsOK; Results.emplace_back(*msg->Report); + auto& lastResult = Results.back(); + TStringStream ss; + ss << "Test run# " << Results.size() << ", type# FullScan with chunk# "; + if (ChunkSizes[ChunkIndex]) { + ss << ChunkSizes[ChunkIndex]; + } else { + ss << "inf"; + } + lastResult.PrefixInfo = ss.Str(); + ++ChunkIndex; if (ChunkIndex == ChunkSizes.size()) State = EState::FullScanGetKeys; - Run(ctx); - return; + return Run(ctx); } case EState::DescribePath: case EState::FullScanGetKeys: return StopWithError(ctx, TStringBuilder() << "TEvTestLoadFinished while in " << State); - case EState::ReadHeadPoints: - break; + case EState::ReadHeadPoints: { + Y_VERIFY(Inflight == 0); + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "headread with inflight# " << Inflights[InflightIndex] + << " finished: " << msg->Report->ToString()); + Errors += msg->Report->OperationsError; + Oks += msg->Report->OperationsOK; + Results.emplace_back(*msg->Report); + + auto& lastResult = Results.back(); + TStringStream ss; + ss << "Test run# " << Results.size() << ", type# ReadHeadPoints with inflight# " + << Inflights[InflightIndex]; + lastResult.PrefixInfo = ss.Str(); + + ++InflightIndex; + if (InflightIndex == Inflights.size()) + return Finish(ctx); + + return Run(ctx); + } } } @@ -611,6 +651,18 @@ private: } void RunHeadReads(const TActorContext& ctx) { + Y_VERIFY(Inflight == 0); + Y_VERIFY(InflightIndex < Inflights.size()); + + HeadReadsHist.Reset(); + StartTsSubTest = TInstant::Now(); + + Inflight = Inflights[InflightIndex]; + for (size_t i = 0; i < Inflight; ++i) + RunSingleHeadRead(ctx); + } + + void RunSingleHeadRead(const TActorContext& ctx) { auto request = std::make_unique<TEvDataShard::TEvRead>(); auto& record = request->Record; @@ -624,28 +676,53 @@ private: record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC); - auto* readActor = new TReadIteratorPoints(request.release(), TabletId, SelfId(), Keys); StartedActors.emplace_back(ctx.Register(readActor)); - LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag << " started read actor with id# " << StartedActors.back()); } void Handle(TEvPrivate::TEvPointTimes::TPtr& ev, const TActorContext& ctx) { + --Inflight; + const auto& requestTimes = ev->Get()->RequestTimes; LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag - << " received point times# " << requestTimes.size()); + << " received point times# " << requestTimes.size() << ", Inflight left# " << Inflight); for (auto t: requestTimes) { auto ms = t.MilliSeconds(); if (ms == 0) - ms = 1; // round + ms = 1; // round up HeadReadsHist.RecordValue(ms); } - Finish(ctx); - return; + if (Inflight == 0) { + auto ts = TInstant::Now(); + auto delta = ts - StartTsSubTest; + + auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0); + response->Report = TEvDataShardLoad::TLoadReport(); + response->Report->Duration = delta; + response->Report->OperationsOK = Inflights[InflightIndex] * Config.GetRowCount(); + response->Report->OperationsError = 0; + + TStringStream ss; + i64 v50 = HeadReadsHist.GetValueAtPercentile(50.0); + i64 v95 = HeadReadsHist.GetValueAtPercentile(95.00); + i64 v99 = HeadReadsHist.GetValueAtPercentile(99.00); + i64 v999 = HeadReadsHist.GetValueAtPercentile(99.9); + + ss << "single row head read hist (ms):" + << "\n50%: " << v50 + << "\n95%: " << v95 + << "\n99%: " << v99 + << "\n99.9%: " << v999 + << Endl; + + response->Report->Info = ss.Str(); + ctx.Send(SelfId(), response.release()); + } } void Finish(const TActorContext& ctx) { @@ -663,21 +740,8 @@ private: ss << report.ToString() << Endl; } - { - i64 v50 = HeadReadsHist.GetValueAtPercentile(50.0); - i64 v95 = HeadReadsHist.GetValueAtPercentile(95.00); - i64 v99 = HeadReadsHist.GetValueAtPercentile(99.00); - i64 v999 = HeadReadsHist.GetValueAtPercentile(99.9); - - ss << "HeadReads (ms): " << "50%: " << v50 - << "95%: " << v95 - << "99%: " << v99 - << "99.9%: " << v999 - << Endl; - } - response->Report->Info = ss.Str(); - response->Report->SubtestCount = Results.size() + 1; + response->Report->SubtestCount = Results.size(); LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag << " finished in " << delta << " with report:\n" << response->Report->Info); |