summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <[email protected]>2023-01-10 12:27:33 +0300
committereivanov89 <[email protected]>2023-01-10 12:27:33 +0300
commit1026b06de8cc942b34df890d3c387fa435283169 (patch)
tree6cf7efde9ae899a3cc381cf039795dd38f080a71
parent7ed7a7a1c41d74356ce964b5dd13b842f3a8276d (diff)
add infinite mode to ycsb load actors
-rw-r--r--ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp30
-rw-r--r--ydb/core/load_test/ycsb/kqp_upsert.cpp12
-rw-r--r--ydb/core/load_test/ycsb/test_load_read_iterator.cpp22
-rw-r--r--ydb/core/protos/datashard_load.proto6
4 files changed, 63 insertions, 7 deletions
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
index fd5e4613f6e..6790bfab87c 100644
--- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
@@ -214,15 +214,39 @@ private:
void SendRows(const TActorContext &ctx) {
while (Inflight < Config.GetInflight() && CurrentRequest < Requests.size()) {
const auto* request = Requests[CurrentRequest].get();
- LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
- << "TUpsertActor# " << Id << " send request# " << CurrentRequest << ": " << request->ToString());
- NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release());
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TUpsertActor# " << Id
+ << " send request# " << CurrentRequest << ": " << request->ToString());
+
+ if (!Config.GetInfinite()) {
+ NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release());
+ } else {
+ switch (RequestType) {
+ case ERequestType::UpsertBulk: {
+ const auto& casted = static_cast<const TEvDataShard::TEvUploadRowsRequest*>(request);
+ auto requestCopy = std::make_unique<TEvDataShard::TEvUploadRowsRequest>();
+ requestCopy->Record = casted->Record;
+ NTabletPipe::SendData(ctx, Pipe, requestCopy.release());
+ break;
+ } case ERequestType::UpsertLocalMkql: {
+ const auto& casted = static_cast<const TEvTablet::TEvLocalMKQL*>(request);
+ auto requestCopy = std::make_unique<TEvTablet::TEvLocalMKQL>();
+ requestCopy->Record = casted->Record;
+ NTabletPipe::SendData(ctx, Pipe, requestCopy.release());
+ break;
+ }
+ }
+ }
+
++CurrentRequest;
++Inflight;
}
}
void OnRequestDone(const TActorContext& ctx) {
+ if (Config.GetInfinite() && CurrentRequest >= Requests.size()) {
+ CurrentRequest = 0;
+ }
+
if (CurrentRequest < Requests.size()) {
SendRows(ctx);
} else if (Inflight == 0) {
diff --git a/ydb/core/load_test/ycsb/kqp_upsert.cpp b/ydb/core/load_test/ycsb/kqp_upsert.cpp
index 54f2217c9aa..7b32495a03d 100644
--- a/ydb/core/load_test/ycsb/kqp_upsert.cpp
+++ b/ydb/core/load_test/ycsb/kqp_upsert.cpp
@@ -153,7 +153,13 @@ private:
<< " send request# " << CurrentRequest
<< " to proxy# " << kqpProxy << ": " << request->ToString());
- ctx.Send(kqpProxy, Requests[CurrentRequest].release());
+ if (!Config.GetInfinite()) {
+ ctx.Send(kqpProxy, Requests[CurrentRequest].release());
+ } else {
+ auto requestCopy = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
+ requestCopy->Record = request->Record;
+ ctx.Send(kqpProxy, requestCopy.release());
+ }
++CurrentRequest;
++Inflight;
@@ -161,6 +167,10 @@ private:
}
void OnRequestDone(const TActorContext& ctx) {
+ if (Config.GetInfinite() && CurrentRequest >= Requests.size()) {
+ CurrentRequest = 0;
+ }
+
if (CurrentRequest < Requests.size()) {
SendRows(ctx);
} else if (Inflight == 0) {
diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
index 0cc161deb36..09790de8cfb 100644
--- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
+++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
@@ -101,6 +101,7 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> {
TVector<TOwnedCellVec> Points;
ui64 ReadCount = 0;
+ const bool Infinite;
size_t CurrentPoint = 0;
THPTimer RequestTimer;
@@ -112,7 +113,8 @@ public:
const TActorId& parent,
const TSubLoadId& id,
const TVector<TOwnedCellVec>& points,
- ui64 readCount)
+ ui64 readCount,
+ bool infinite)
: BaseRequest(request)
, Format(BaseRequest->Record.GetResultFormat())
, TabletId(tablet)
@@ -120,6 +122,7 @@ public:
, Id(id)
, Points(points)
, ReadCount(readCount)
+ , Infinite(infinite)
{
RequestTimes.reserve(Points.size());
}
@@ -209,6 +212,10 @@ private:
RequestTimes.push_back(TDuration::Seconds(RequestTimer.Passed()));
+ if (Infinite && CurrentPoint >= Points.size()) {
+ CurrentPoint = 0;
+ }
+
if (CurrentPoint < Points.size()) {
SendRead(ctx);
return;
@@ -487,6 +494,10 @@ public:
}
}
+ if (Config.GetNoFullScan() || Config.GetInfinite()) {
+ ChunkSizes.clear();
+ }
+
if (Config.HasReadCount()) {
ReadCount = Config.GetReadCount();
} else {
@@ -560,7 +571,11 @@ private:
<< Target.GetWorkingDir() << "/" << Target.GetTableName()
<< " with columnsCount# " << AllColumnIds.size() << ", keyColumnCount# " << KeyColumnIds.size());
- State = EState::FullScan;
+ if (!ChunkSizes.empty()) {
+ State = EState::FullScan;
+ } else {
+ State = EState::FullScanGetKeys;
+ }
Run(ctx);
}
@@ -703,7 +718,8 @@ private:
SelfId(),
subId,
Keys,
- ReadCount);
+ ReadCount,
+ Config.GetInfinite());
StartedActors.emplace_back(ctx.Register(readActor));
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
index 1e79fe889ba..1991a020c00 100644
--- a/ydb/core/protos/datashard_load.proto
+++ b/ydb/core/protos/datashard_load.proto
@@ -45,6 +45,12 @@ message TEvYCSBTestLoadRequest {
// Specifies the format for result data in TEvReadResult
optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 5;
+
+ // special mode: actor reads RowCount rows again and again
+ // implies no full scan and only first inflight will be used
+ optional bool Infinite = 6;
+
+ optional bool NoFullScan = 7;
}
message TTableSetup {