aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-26 21:21:16 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-26 21:21:16 +0300
commit141fe23a8ace89f84fd7b4c7aac8cca332608c3a (patch)
treea2c2b5db0562e5372662f6486f594679cd37ffaf
parent7881b02bb3ef2f12d7abc3c8ff505ac451e6190c (diff)
downloadydb-141fe23a8ace89f84fd7b4c7aac8cca332608c3a.tar.gz
run read iterator load with different inflights
-rw-r--r--ydb/core/protos/datashard_load.proto4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_testload.cpp2
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.h6
-rw-r--r--ydb/core/tx/datashard/testload/test_load_read_iterator.cpp132
4 files changed, 108 insertions, 36 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
index edb71cf82d0..e88e6e76096 100644
--- a/ydb/core/protos/datashard_load.proto
+++ b/ydb/core/protos/datashard_load.proto
@@ -27,8 +27,10 @@ message TEvTestLoadRequest {
// defines dataset size, normally must be withing 2 GiB
optional uint64 RowCount = 2;
+ repeated uint32 Inflights = 3;
+
// Specifies the format for result data in TEvReadResult
- optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 3;
+ optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 4;
}
optional uint64 Cookie = 1;
diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp
index eebc1f4dec9..27e95c72836 100644
--- a/ydb/core/tx/datashard/datashard_ut_testload.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp
@@ -298,7 +298,7 @@ Y_UNIT_TEST_SUITE(ReadLoad) {
// fullscans with different chunks: 5
// read head with inflight 1
- UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 6);
+ UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 12);
// sanity check that there was data in table
helper.CheckKeysCount(expectedRowCount);
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.h b/ydb/core/tx/datashard/testload/test_load_actor.h
index 053fe433d7d..98e21d4b99b 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.h
+++ b/ydb/core/tx/datashard/testload/test_load_actor.h
@@ -42,8 +42,14 @@ struct TEvDataShardLoad {
TString Info;
ui64 SubtestCount = 0;
+ // used by test launchers to specify params and test number
+ TString PrefixInfo;
+
TString ToString() const {
TStringStream ss;
+ if (PrefixInfo)
+ ss << PrefixInfo << ". ";
+
ss << "Load duration: " << Duration << ", OK=" << OperationsOK << ", Error=" << OperationsError;
if (OperationsOK && Duration.Seconds()) {
ui64 throughput = OperationsOK / Duration.Seconds();
diff --git a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
index 11752bd8d3b..30b86bc9ef6 100644
--- a/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_read_iterator.cpp
@@ -86,6 +86,8 @@ void AddKeyQuery(
request.Keys.emplace_back(buf);
}
+// TReadIteratorPoints
+
class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> {
const std::unique_ptr<const TEvDataShard::TEvRead> BaseRequest;
const NKikimrTxDataShard::EScanDataFormat Format;
@@ -233,6 +235,8 @@ private:
)
};
+// TReadIteratorScan
+
class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> {
std::unique_ptr<TEvDataShard::TEvRead> Request;
const NKikimrTxDataShard::EScanDataFormat Format;
@@ -379,6 +383,8 @@ private:
)
};
+// TReadIteratorLoadScenario
+
enum class EState {
DescribePath,
Upsert,
@@ -393,6 +399,7 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
const ui64 Tag;
+ // used to measure full run of this actor
TInstant StartTs;
TString ConfingString;
@@ -412,16 +419,20 @@ class TReadIteratorLoadScenario : public TActorBootstrapped<TReadIteratorLoadSce
size_t Oks = 0;
size_t Errors = 0;
TVector<TEvDataShardLoad::TLoadReport> Results;
+
+ // accumulates results from read actors: between different inflights/chunks must be reset
NHdr::THistogram HeadReadsHist;
+ TInstant StartTsSubTest;
EState State = EState::DescribePath;
+ ui64 Inflight = 0;
// setup for fullscan
- const TVector<ui64> ChunkSizes = {0, 1, 10, 100, 1000};
+ const TVector<ui64> ChunkSizes = {0, 0, 1, 1, 10, 10, 100, 100, 1000, 1000}; // each twice intentionally
size_t ChunkIndex = 0;
- // setup for all load tests
- const TVector<ui64> Inflights = {1, 2, 10, 50, 100, 200, 400 };
+ // note that might be overwritten by test incoming test config
+ TVector<ui64> Inflights = {1, 2, 10, 50, 100, 200, 400};
size_t InflightIndex = 0;
public:
@@ -434,6 +445,13 @@ public:
, HeadReadsHist(1000, 4)
{
google::protobuf::TextFormat::PrintToString(cmd, &ConfingString);
+
+ if (Config.InflightsSize()) {
+ Inflights.clear();
+ for (auto inflight: Config.GetInflights()) {
+ Inflights.push_back(inflight);
+ }
+ }
}
void Bootstrap(const TActorContext& ctx) {
@@ -441,15 +459,11 @@ public:
<< " with tag# " << Tag << " Bootstrap called: " << ConfingString);
Become(&TReadIteratorLoadScenario::StateFunc);
+ StartTs = TInstant::Now();
Run(ctx);
}
private:
- void ResetIndices() {
- ChunkIndex = 0;
- InflightIndex = 0;
- }
-
void Run(const TActorContext& ctx) {
switch (State) {
case EState::DescribePath:
@@ -575,8 +589,7 @@ private:
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "upsert actor# " << ev->Sender
<< " finished: " << msg->Report->ToString());
State = EState::FullScan;
- Run(ctx);
- return;
+ return Run(ctx);
}
case EState::FullScan: {
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "fullscan actor# " << ev->Sender
@@ -586,17 +599,44 @@ private:
Oks += msg->Report->OperationsOK;
Results.emplace_back(*msg->Report);
+ auto& lastResult = Results.back();
+ TStringStream ss;
+ ss << "Test run# " << Results.size() << ", type# FullScan with chunk# ";
+ if (ChunkSizes[ChunkIndex]) {
+ ss << ChunkSizes[ChunkIndex];
+ } else {
+ ss << "inf";
+ }
+ lastResult.PrefixInfo = ss.Str();
+
++ChunkIndex;
if (ChunkIndex == ChunkSizes.size())
State = EState::FullScanGetKeys;
- Run(ctx);
- return;
+ return Run(ctx);
}
case EState::DescribePath:
case EState::FullScanGetKeys:
return StopWithError(ctx, TStringBuilder() << "TEvTestLoadFinished while in " << State);
- case EState::ReadHeadPoints:
- break;
+ case EState::ReadHeadPoints: {
+ Y_VERIFY(Inflight == 0);
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "headread with inflight# " << Inflights[InflightIndex]
+ << " finished: " << msg->Report->ToString());
+ Errors += msg->Report->OperationsError;
+ Oks += msg->Report->OperationsOK;
+ Results.emplace_back(*msg->Report);
+
+ auto& lastResult = Results.back();
+ TStringStream ss;
+ ss << "Test run# " << Results.size() << ", type# ReadHeadPoints with inflight# "
+ << Inflights[InflightIndex];
+ lastResult.PrefixInfo = ss.Str();
+
+ ++InflightIndex;
+ if (InflightIndex == Inflights.size())
+ return Finish(ctx);
+
+ return Run(ctx);
+ }
}
}
@@ -611,6 +651,18 @@ private:
}
void RunHeadReads(const TActorContext& ctx) {
+ Y_VERIFY(Inflight == 0);
+ Y_VERIFY(InflightIndex < Inflights.size());
+
+ HeadReadsHist.Reset();
+ StartTsSubTest = TInstant::Now();
+
+ Inflight = Inflights[InflightIndex];
+ for (size_t i = 0; i < Inflight; ++i)
+ RunSingleHeadRead(ctx);
+ }
+
+ void RunSingleHeadRead(const TActorContext& ctx) {
auto request = std::make_unique<TEvDataShard::TEvRead>();
auto& record = request->Record;
@@ -624,28 +676,53 @@ private:
record.SetResultFormat(::NKikimrTxDataShard::EScanDataFormat::CELLVEC);
-
auto* readActor = new TReadIteratorPoints(request.release(), TabletId, SelfId(), Keys);
StartedActors.emplace_back(ctx.Register(readActor));
- LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
<< " started read actor with id# " << StartedActors.back());
}
void Handle(TEvPrivate::TEvPointTimes::TPtr& ev, const TActorContext& ctx) {
+ --Inflight;
+
const auto& requestTimes = ev->Get()->RequestTimes;
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
- << " received point times# " << requestTimes.size());
+ << " received point times# " << requestTimes.size() << ", Inflight left# " << Inflight);
for (auto t: requestTimes) {
auto ms = t.MilliSeconds();
if (ms == 0)
- ms = 1; // round
+ ms = 1; // round up
HeadReadsHist.RecordValue(ms);
}
- Finish(ctx);
- return;
+ if (Inflight == 0) {
+ auto ts = TInstant::Now();
+ auto delta = ts - StartTsSubTest;
+
+ auto response = std::make_unique<TEvDataShardLoad::TEvTestLoadFinished>(0);
+ response->Report = TEvDataShardLoad::TLoadReport();
+ response->Report->Duration = delta;
+ response->Report->OperationsOK = Inflights[InflightIndex] * Config.GetRowCount();
+ response->Report->OperationsError = 0;
+
+ TStringStream ss;
+ i64 v50 = HeadReadsHist.GetValueAtPercentile(50.0);
+ i64 v95 = HeadReadsHist.GetValueAtPercentile(95.00);
+ i64 v99 = HeadReadsHist.GetValueAtPercentile(99.00);
+ i64 v999 = HeadReadsHist.GetValueAtPercentile(99.9);
+
+ ss << "single row head read hist (ms):"
+ << "\n50%: " << v50
+ << "\n95%: " << v95
+ << "\n99%: " << v99
+ << "\n99.9%: " << v999
+ << Endl;
+
+ response->Report->Info = ss.Str();
+ ctx.Send(SelfId(), response.release());
+ }
}
void Finish(const TActorContext& ctx) {
@@ -663,21 +740,8 @@ private:
ss << report.ToString() << Endl;
}
- {
- i64 v50 = HeadReadsHist.GetValueAtPercentile(50.0);
- i64 v95 = HeadReadsHist.GetValueAtPercentile(95.00);
- i64 v99 = HeadReadsHist.GetValueAtPercentile(99.00);
- i64 v999 = HeadReadsHist.GetValueAtPercentile(99.9);
-
- ss << "HeadReads (ms): " << "50%: " << v50
- << "95%: " << v95
- << "99%: " << v99
- << "99.9%: " << v999
- << Endl;
- }
-
response->Report->Info = ss.Str();
- response->Report->SubtestCount = Results.size() + 1;
+ response->Report->SubtestCount = Results.size();
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Tag
<< " finished in " << delta << " with report:\n" << response->Report->Info);