diff options
| author | eivanov89 <[email protected]> | 2023-01-10 12:27:33 +0300 |
|---|---|---|
| committer | eivanov89 <[email protected]> | 2023-01-10 12:27:33 +0300 |
| commit | 1026b06de8cc942b34df890d3c387fa435283169 (patch) | |
| tree | 6cf7efde9ae899a3cc381cf039795dd38f080a71 | |
| parent | 7ed7a7a1c41d74356ce964b5dd13b842f3a8276d (diff) | |
add infinite mode to ycsb load actors
| -rw-r--r-- | ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp | 30 | ||||
| -rw-r--r-- | ydb/core/load_test/ycsb/kqp_upsert.cpp | 12 | ||||
| -rw-r--r-- | ydb/core/load_test/ycsb/test_load_read_iterator.cpp | 22 | ||||
| -rw-r--r-- | ydb/core/protos/datashard_load.proto | 6 |
4 files changed, 63 insertions, 7 deletions
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp index fd5e4613f6e..6790bfab87c 100644 --- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp +++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp @@ -214,15 +214,39 @@ private: void SendRows(const TActorContext &ctx) { while (Inflight < Config.GetInflight() && CurrentRequest < Requests.size()) { const auto* request = Requests[CurrentRequest].get(); - LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id - << "TUpsertActor# " << Id << " send request# " << CurrentRequest << ": " << request->ToString()); - NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release()); + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TUpsertActor# " << Id + << " send request# " << CurrentRequest << ": " << request->ToString()); + + if (!Config.GetInfinite()) { + NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release()); + } else { + switch (RequestType) { + case ERequestType::UpsertBulk: { + const auto& casted = static_cast<const TEvDataShard::TEvUploadRowsRequest*>(request); + auto requestCopy = std::make_unique<TEvDataShard::TEvUploadRowsRequest>(); + requestCopy->Record = casted->Record; + NTabletPipe::SendData(ctx, Pipe, requestCopy.release()); + break; + } case ERequestType::UpsertLocalMkql: { + const auto& casted = static_cast<const TEvTablet::TEvLocalMKQL*>(request); + auto requestCopy = std::make_unique<TEvTablet::TEvLocalMKQL>(); + requestCopy->Record = casted->Record; + NTabletPipe::SendData(ctx, Pipe, requestCopy.release()); + break; + } + } + } + ++CurrentRequest; ++Inflight; } } void OnRequestDone(const TActorContext& ctx) { + if (Config.GetInfinite() && CurrentRequest >= Requests.size()) { + CurrentRequest = 0; + } + if (CurrentRequest < Requests.size()) { SendRows(ctx); } else if (Inflight == 0) { diff --git a/ydb/core/load_test/ycsb/kqp_upsert.cpp b/ydb/core/load_test/ycsb/kqp_upsert.cpp index 54f2217c9aa..7b32495a03d 100644 --- a/ydb/core/load_test/ycsb/kqp_upsert.cpp +++ b/ydb/core/load_test/ycsb/kqp_upsert.cpp @@ -153,7 +153,13 @@ private: << " send request# " << CurrentRequest << " to proxy# " << kqpProxy << ": " << request->ToString()); - ctx.Send(kqpProxy, Requests[CurrentRequest].release()); + if (!Config.GetInfinite()) { + ctx.Send(kqpProxy, Requests[CurrentRequest].release()); + } else { + auto requestCopy = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); + requestCopy->Record = request->Record; + ctx.Send(kqpProxy, requestCopy.release()); + } ++CurrentRequest; ++Inflight; @@ -161,6 +167,10 @@ private: } void OnRequestDone(const TActorContext& ctx) { + if (Config.GetInfinite() && CurrentRequest >= Requests.size()) { + CurrentRequest = 0; + } + if (CurrentRequest < Requests.size()) { SendRows(ctx); } else if (Inflight == 0) { diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp index 0cc161deb36..09790de8cfb 100644 --- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp +++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp @@ -101,6 +101,7 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { TVector<TOwnedCellVec> Points; ui64 ReadCount = 0; + const bool Infinite; size_t CurrentPoint = 0; THPTimer RequestTimer; @@ -112,7 +113,8 @@ public: const TActorId& parent, const TSubLoadId& id, const TVector<TOwnedCellVec>& points, - ui64 readCount) + ui64 readCount, + bool infinite) : BaseRequest(request) , Format(BaseRequest->Record.GetResultFormat()) , TabletId(tablet) @@ -120,6 +122,7 @@ public: , Id(id) , Points(points) , ReadCount(readCount) + , Infinite(infinite) { RequestTimes.reserve(Points.size()); } @@ -209,6 +212,10 @@ private: RequestTimes.push_back(TDuration::Seconds(RequestTimer.Passed())); + if (Infinite && CurrentPoint >= Points.size()) { + CurrentPoint = 0; + } + if (CurrentPoint < Points.size()) { SendRead(ctx); return; @@ -487,6 +494,10 @@ public: } } + if (Config.GetNoFullScan() || Config.GetInfinite()) { + ChunkSizes.clear(); + } + if (Config.HasReadCount()) { ReadCount = Config.GetReadCount(); } else { @@ -560,7 +571,11 @@ private: << Target.GetWorkingDir() << "/" << Target.GetTableName() << " with columnsCount# " << AllColumnIds.size() << ", keyColumnCount# " << KeyColumnIds.size()); - State = EState::FullScan; + if (!ChunkSizes.empty()) { + State = EState::FullScan; + } else { + State = EState::FullScanGetKeys; + } Run(ctx); } @@ -703,7 +718,8 @@ private: SelfId(), subId, Keys, - ReadCount); + ReadCount, + Config.GetInfinite()); StartedActors.emplace_back(ctx.Register(readActor)); diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index 1e79fe889ba..1991a020c00 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -45,6 +45,12 @@ message TEvYCSBTestLoadRequest { // Specifies the format for result data in TEvReadResult optional NKikimrTxDataShard.EScanDataFormat ResultFormat = 5; + + // special mode: actor reads RowCount rows again and again + // implies no full scan and only first inflight will be used + optional bool Infinite = 6; + + optional bool NoFullScan = 7; } message TTableSetup { |
