aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-02-10 14:22:59 +0300
committereivanov89 <eivanov89@ydb.tech>2023-02-10 14:22:59 +0300
commit712a7a36204f16a88f8c57a0e3cc120f3e592718 (patch)
tree5ac4b0c8ebcca2b951845b9ac1444d571cbdaf6a
parentc67ba9c337d27dcb784fd0df4ad522fe2b32f725 (diff)
downloadydb-712a7a36204f16a88f8c57a0e3cc120f3e592718.tar.gz
PR from branch users/eivanov89/-ycsb-load-actors-fixes
properly stop DS load actor and provide HTML results rework kqp select actor: reduce memory footptint avoid huge memory consumption by ycsb upsert actors cleanup logs
-rw-r--r--ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp124
-rw-r--r--ydb/core/load_test/ycsb/kqp_select.cpp143
-rw-r--r--ydb/core/load_test/ycsb/kqp_upsert.cpp10
-rw-r--r--ydb/core/load_test/ycsb/test_load_actor.cpp56
-rw-r--r--ydb/core/load_test/ycsb/test_load_read_iterator.cpp29
-rw-r--r--ydb/core/protos/datashard_load.proto2
6 files changed, 200 insertions, 164 deletions
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
index 574553c5148..ca95d2a1725 100644
--- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
@@ -21,6 +21,8 @@ using TUploadRowsRequestPtr = std::unique_ptr<TEvDataShard::TEvUploadRowsRequest
namespace {
+const ui64 RECONNECT_LIMIT = 10;
+
enum class ERequestType {
UpsertBulk,
UpsertLocalMkql,
@@ -89,35 +91,24 @@ TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum, const TSt
return TUploadRequest(request.release());
}
-TRequestsVector GenerateRequests(
+TUploadRequest GenerateRequest(
ui64 tableId,
ui64 keyFrom,
- ui64 n,
- ui64 batchSize, // only bulk requests
+ ui64 batchSize, // only bulk requests, otherwise 1
ERequestType requestType,
const TString& table)
{
- TRequestsVector requests;
- requests.reserve(n);
-
- for (size_t i = keyFrom; i < keyFrom + n; ++i) {
- switch (requestType) {
- case ERequestType::UpsertBulk: {
- auto keysLeft = keyFrom + n - i;
- auto currentBatchSize = Max(Min(batchSize, keysLeft), ui64(1));
- requests.emplace_back(GenerateBulkRowRequest(tableId, i, currentBatchSize));
- break;
- }
- case ERequestType::UpsertLocalMkql:
- requests.emplace_back(GenerateMkqlRowRequest(tableId, i, table));
- break;
- default:
- // should not happen, just for compiler
- Y_FAIL("Unsupported request type");
- }
+ switch (requestType) {
+ case ERequestType::UpsertBulk:
+ return GenerateBulkRowRequest(tableId, keyFrom, batchSize);
+ break;
+ case ERequestType::UpsertLocalMkql:
+ return GenerateMkqlRowRequest(tableId, keyFrom, table);
+ break;
+ default:
+ // should not happen, just for compiler
+ Y_FAIL("Unsupported request type");
}
-
- return requests;
}
class TUpsertActor : public TActorBootstrapped<TUpsertActor> {
@@ -130,10 +121,9 @@ class TUpsertActor : public TActorBootstrapped<TUpsertActor> {
TActorId Pipe;
bool WasConnected = false;
- ui64 ReconnectLimit = 10;
+ ui64 ReconnectLimit = RECONNECT_LIMIT;
- TRequestsVector Requests;
- size_t CurrentRequest = 0;
+ size_t CurrentRow = 0;
size_t Inflight = 0;
TInstant StartTs;
@@ -164,17 +154,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
- << " TUpsertActor Bootstrap called: " << ConfingString);
-
- // note that we generate all requests at once to send at max speed, i.e.
- // do not mess with protobufs, strings, etc when send data
- Requests = GenerateRequests(
- Target.GetTableId(),
- Config.GetKeyFrom(),
- Config.GetRowCount(),
- Config.GetBatchSize(),
- RequestType,
- Target.GetTableName());
+ << " TUpsertActor Bootstrap called: " << ConfingString << " with type# " << int(RequestType));
Become(&TUpsertActor::StateFunc);
Connect(ctx);
@@ -182,7 +162,15 @@ public:
private:
void Connect(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Connect called");
+ if (ReconnectLimit != RECONNECT_LIMIT) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TUpsertActor# " << Id
+ << " will reconnect to tablet# " << Target.GetTabletId()
+ << " retries left# " << (ReconnectLimit - 1));
+ } else {
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TUpsertActor# " << Id
+ << " will connect to tablet# " << Target.GetTabletId());
+ }
+
--ReconnectLimit;
if (ReconnectLimit == 0) {
TStringStream ss;
@@ -195,7 +183,7 @@ private:
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
TEvTabletPipe::TEvClientConnected *msg = ev->Get();
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor Handle TEvClientConnected called, Status# " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
@@ -221,42 +209,34 @@ private:
}
void SendRows(const TActorContext &ctx) {
- while (Inflight < Config.GetInflight() && CurrentRequest < Requests.size()) {
- const auto* request = Requests[CurrentRequest].get();
+ while (Inflight < Config.GetInflight() && CurrentRow < Config.GetRowCount()) {
+ auto rowsLest = Config.GetRowCount() - CurrentRow;
+ auto batchSize = Min(size_t{Config.GetBatchSize()}, rowsLest);
+
+ auto request = GenerateRequest(
+ Target.GetTableId(),
+ Config.GetKeyFrom() + CurrentRow,
+ batchSize,
+ RequestType,
+ Target.GetTableName());
+
LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TUpsertActor# " << Id
- << " send request# " << CurrentRequest << ": " << request->ToString());
-
- if (!Config.GetInfinite()) {
- NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release());
- } else {
- switch (RequestType) {
- case ERequestType::UpsertBulk: {
- const auto& casted = static_cast<const TEvDataShard::TEvUploadRowsRequest*>(request);
- auto requestCopy = std::make_unique<TEvDataShard::TEvUploadRowsRequest>();
- requestCopy->Record = casted->Record;
- NTabletPipe::SendData(ctx, Pipe, requestCopy.release());
- break;
- } case ERequestType::UpsertLocalMkql: {
- const auto& casted = static_cast<const TEvTablet::TEvLocalMKQL*>(request);
- auto requestCopy = std::make_unique<TEvTablet::TEvLocalMKQL>();
- requestCopy->Record = casted->Record;
- NTabletPipe::SendData(ctx, Pipe, requestCopy.release());
- break;
- }
- }
- }
-
- ++CurrentRequest;
+ << " sends request# " << CurrentRow << " with batchSize# " << batchSize
+ << ": " << request->ToString());
+
+ NTabletPipe::SendData(ctx, Pipe, request.release());
+
+ CurrentRow += batchSize;
++Inflight;
}
}
void OnRequestDone(const TActorContext& ctx) {
- if (Config.GetInfinite() && CurrentRequest >= Requests.size()) {
- CurrentRequest = 0;
+ if (Config.GetInfinite() && CurrentRow >= Config.GetRowCount()) {
+ CurrentRow = 0;
}
- if (CurrentRequest < Requests.size()) {
+ if (CurrentRow < Config.GetRowCount()) {
SendRows(ctx);
} else if (Inflight == 0) {
EndTs = TInstant::Now();
@@ -266,7 +246,7 @@ private:
auto& report = *response->Record.MutableReport();
report.SetTag(Id.SubTag);
report.SetDurationMs(delta.MilliSeconds());
- report.SetOperationsOK(Requests.size() - Errors);
+ report.SetOperationsOK(Config.GetRowCount() - Errors);
report.SetOperationsError(Errors);
ctx.Send(Parent, response.release());
@@ -315,10 +295,10 @@ private:
TStringStream str;
HTML(str) {
str << "DS bulk upsert load actor# " << Id << " started on " << StartTs
- << " sent " << CurrentRequest << " out of " << Requests.size();
+ << " sent " << CurrentRow << " out of " << Config.GetRowCount();
TInstant ts = EndTs ? EndTs : TInstant::Now();
auto delta = ts - StartTs;
- auto throughput = Requests.size() * 1000 / delta.MilliSeconds();
+ auto throughput = Config.GetRowCount() * 1000 / (delta.MilliSeconds() ? delta.MilliSeconds() : 1);
str << " in " << delta << " (" << throughput << " op/s)"
<< " errors=" << Errors;
}
@@ -327,13 +307,13 @@ private:
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet recieved PoisonPill, going to die");
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet recieved PoisonPill, going to die");
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
}
void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
+ LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason);
ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason));
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
diff --git a/ydb/core/load_test/ycsb/kqp_select.cpp b/ydb/core/load_test/ycsb/kqp_select.cpp
index 42589476c16..ac4613c2d92 100644
--- a/ydb/core/load_test/ycsb/kqp_select.cpp
+++ b/ydb/core/load_test/ycsb/kqp_select.cpp
@@ -58,6 +58,27 @@ TQueryInfo GenerateSelect(const TString& table, const TString& key) {
return TQueryInfo(str.Str(), std::move(params));
}
+std::unique_ptr<NKqp::TEvKqp::TEvQueryRequest> GenerateSelectRequest(const TString& db, const TString& table, const TString& key) {
+ auto queryInfo = GenerateSelect(table, key);
+
+ auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
+ request->Record.MutableRequest()->SetKeepSession(true);
+ request->Record.MutableRequest()->SetDatabase(db);
+
+ request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ request->Record.MutableRequest()->SetQuery(queryInfo.Query);
+
+ request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
+ request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
+ const auto& params = NYdb::TProtoAccessor::GetProtoMap(queryInfo.Params);
+ request->Record.MutableRequest()->MutableYdbParameters()->insert(params.begin(), params.end());
+
+ return request;
+}
+
// it's a partial copy-paste from TUpsertActor: logic slightly differs, so that
// it seems better to have copy-paste rather if/else for different loads
class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> {
@@ -65,12 +86,17 @@ class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> {
const TActorId Parent;
const TSubLoadId Id;
const TString Database;
-
+ const TString TableName;
+ const TVector<TString>& Keys;
+ const size_t FromKey;
+ size_t CurrentKey = 0;
+ const size_t ReadCount;
+ const bool Infinite;
+
+ TActorId KqpProxyId;
TString Session;
- TRequestsVector Requests;
- bool Infinite;
- size_t CurrentRequest = 0;
+ size_t KeysRead = 0;
TInstant StartTs;
TInstant EndTs;
@@ -82,76 +108,78 @@ public:
const TActorId& parent,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const TSubLoadId& id,
- TRequestsVector requests,
+ const TVector<TString>& keys,
+ size_t fromKey,
+ size_t readCount,
bool infinite)
: Target(target)
, Parent(parent)
, Id(id)
, Database(Target.GetWorkingDir())
- , Requests(std::move(requests))
+ , TableName(Target.GetTableName())
+ , Keys(keys)
+ , FromKey(fromKey)
+ , CurrentKey(fromKey)
+ , ReadCount(readCount)
, Infinite(infinite)
{
Y_UNUSED(counters);
}
void Bootstrap(const TActorContext& ctx) {
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
<< " Bootstrap called");
+ KqpProxyId = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+
Become(&TKqpSelectActor::StateFunc);
CreateSession(ctx);
}
private:
void CreateSession(const TActorContext& ctx) {
- auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
- << " sends event for session creation to proxy: " << kqpProxy.ToString());
+ << " sends event for session creation to proxy: " << KqpProxyId.ToString());
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
ev->Record.MutableRequest()->SetDatabase(Database);
- Send(kqpProxy, ev.Release());
+ Send(KqpProxyId, ev.Release());
}
void CloseSession(const TActorContext& ctx) {
if (!Session)
return;
- auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
- << " sends session close query to proxy: " << kqpProxy);
+ << " sends session close query to proxy: " << KqpProxyId);
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
ev->Record.MutableRequest()->SetSessionId(Session);
- ctx.Send(kqpProxy, ev.Release());
+ ctx.Send(KqpProxyId, ev.Release());
}
void ReadRow(const TActorContext &ctx) {
- auto* request = static_cast<NKqp::TEvKqp::TEvQueryRequest*>(Requests[CurrentRequest].get());
+ auto request = GenerateSelectRequest(Database, TableName, Keys[CurrentKey]);
request->Record.MutableRequest()->SetSessionId(Session);
- auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id
- << " send request# " << CurrentRequest
- << " to proxy# " << kqpProxy << ": " << request->ToString());
-
- if (!Infinite) {
- ctx.Send(kqpProxy, Requests[CurrentRequest].release());
- } else {
- auto requestCopy = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
- requestCopy->Record = request->Record;
- ctx.Send(kqpProxy, requestCopy.release());
- }
+ << " send request# " << CurrentKey
+ << " to proxy# " << KqpProxyId << ": " << request->ToString());
- ++CurrentRequest;
+ ctx.Send(KqpProxyId, request.release());
+ ++CurrentKey;
+ ++KeysRead;
}
void OnRequestDone(const TActorContext& ctx) {
- if (Infinite && CurrentRequest >= Requests.size()) {
- CurrentRequest = 0;
+ if (Infinite && KeysRead == ReadCount) {
+ KeysRead = 0;
+ CurrentKey = FromKey;
}
- if (CurrentRequest < Requests.size()) {
+ CurrentKey = CurrentKey % Keys.size();
+
+ if (KeysRead < ReadCount) {
ReadRow(ctx);
} else {
EndTs = TInstant::Now();
@@ -161,7 +189,7 @@ private:
auto& report = *response->Record.MutableReport();
report.SetTag(Id.SubTag);
report.SetDurationMs(delta.MilliSeconds());
- report.SetOperationsOK(Requests.size() - Errors);
+ report.SetOperationsOK(ReadCount - Errors);
report.SetOperationsError(Errors);
ctx.Send(Parent, response.release());
@@ -185,6 +213,7 @@ private:
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
Session = response.GetResponse().GetSessionId();
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id << " session: " << Session);
+ StartTs = TInstant::Now();
ReadRow(ctx);
} else {
StopWithError(ctx, "failed to create session: " + ev->Get()->ToString());
@@ -213,10 +242,10 @@ private:
TStringStream str;
HTML(str) {
str << "TKqpSelectActor# " << Id << " started on " << StartTs
- << " sent " << CurrentRequest << " out of " << Requests.size();
+ << " sent " << CurrentKey << " out of " << ReadCount;
TInstant ts = EndTs ? EndTs : TInstant::Now();
auto delta = ts - StartTs;
- auto throughput = Requests.size() / delta.Seconds();
+ auto throughput = ReadCount * 1000 / (delta.MilliSeconds() ? delta.MilliSeconds() : 1);
str << " in " << delta << " (" << throughput << " op/s)"
<< " errors=" << Errors;
}
@@ -252,7 +281,7 @@ class TKqpSelectActorMultiSession : public TActorBootstrapped<TKqpSelectActorMul
TVector<ui32> KeyColumnIds;
TVector<ui32> AllColumnIds;
- TVector<TOwnedCellVec> Keys;
+ TVector<TString> Keys;
ui64 LastReadId = 0;
ui64 LastSubTag = 0;
@@ -373,9 +402,15 @@ private:
}
void Handle(TEvPrivate::TEvKeys::TPtr& ev, const TActorContext& ctx) {
- Keys = std::move(ev->Get()->Keys);
- if (Keys.size() == 0) {
- return StopWithError(ctx, "Failed to read keys");
+ const auto& keyCells = ev->Get()->Keys;
+ if (keyCells.size() == 0) {
+ return StopWithError(ctx, "Failed to read keys or no keys");
+ }
+
+ Keys.reserve(keyCells.size());
+ for (const auto& keyCell: keyCells) {
+ TStringBuf keyBuf = keyCell[0].AsBuf();
+ Keys.emplace_back(keyBuf.data(), keyBuf.size());
}
LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id
@@ -390,38 +425,6 @@ private:
TVector<TRequestsVector> perActorRequests;
perActorRequests.reserve(Inflight);
- size_t keyCounter = 0;
- for (size_t i = 0; i < Inflight; ++i) {
- TRequestsVector requests;
-
- const auto& keyCell = Keys[keyCounter++ % Keys.size()][0];
- TStringBuf keyBuf = keyCell.AsBuf();
- TString key(keyBuf.data(), keyBuf.size());
-
- requests.reserve(ReadCount);
- for (size_t i = 0; i < ReadCount; ++i) {
- auto queryInfo = GenerateSelect(Target.GetTableName(), key);
-
- auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
- request->Record.MutableRequest()->SetKeepSession(true);
- request->Record.MutableRequest()->SetDatabase(Database);
-
- request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
- request->Record.MutableRequest()->SetQuery(queryInfo.Query);
-
- request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
- request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
-
- const auto& params = NYdb::TProtoAccessor::GetProtoMap(queryInfo.Params);
- request->Record.MutableRequest()->MutableYdbParameters()->insert(params.begin(), params.end());
-
- requests.emplace_back(std::move(request));
- }
- perActorRequests.emplace_back(std::move(requests));
- }
-
StartTs = TInstant::Now();
Actors.reserve(Inflight);
@@ -433,7 +436,9 @@ private:
SelfId(),
Counters,
subId,
- std::move(perActorRequests[i]),
+ Keys,
+ 0, // keyFrom
+ ReadCount,
Config.GetInfinite());
Actors.emplace_back(ctx.Register(kqpActor));
}
diff --git a/ydb/core/load_test/ycsb/kqp_upsert.cpp b/ydb/core/load_test/ycsb/kqp_upsert.cpp
index 25dd625c99d..645994370cf 100644
--- a/ydb/core/load_test/ycsb/kqp_upsert.cpp
+++ b/ydb/core/load_test/ycsb/kqp_upsert.cpp
@@ -122,7 +122,7 @@ public:
private:
void CreateSession(const TActorContext& ctx) {
auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
<< " sends event for session creation to proxy: " << kqpProxy.ToString());
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
@@ -135,7 +135,7 @@ private:
return;
auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id
<< " sends session close query to proxy: " << kqpProxy);
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
@@ -373,7 +373,7 @@ private:
return;
}
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Id << " finished: " << ev->Get()->ToString());
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Id << " finished: " << ev->Get()->ToString());
Errors += record.GetReport().GetOperationsError();
Oks += record.GetReport().GetOperationsOK();
@@ -407,13 +407,13 @@ private:
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
<< " tablet recieved PoisonPill, going to die");
Stop(ctx);
}
void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
+ LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id
<< " stopped with error: " << reason);
ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason));
diff --git a/ydb/core/load_test/ycsb/test_load_actor.cpp b/ydb/core/load_test/ycsb/test_load_actor.cpp
index b06494c758e..552cea721c9 100644
--- a/ydb/core/load_test/ycsb/test_load_actor.cpp
+++ b/ydb/core/load_test/ycsb/test_load_actor.cpp
@@ -105,7 +105,7 @@ public:
void CreateSession(const TActorContext& ctx) {
auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
<< " sends event for session creation to proxy# " << kqpProxy.ToString());
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
@@ -118,7 +118,7 @@ public:
return;
auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
<< " sends session close query to proxy: " << kqpProxy);
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
@@ -131,7 +131,7 @@ public:
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
Session = response.GetResponse().GetSessionId();
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " session: " << Session);
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " session: " << Session);
PrepareTable(ctx);
} else {
StopWithError(ctx, "failed to create session: " + ev->Get()->ToString());
@@ -443,7 +443,7 @@ public:
return PrepareTable(ctx);
}
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
<< " received finished from actor# " << ev->Sender << " with tag# " << record.GetTag());
FinishedTests.push_back({record.GetTag(), record.GetErrorReason(), TAppData::TimeProvider->Now(), record.GetReport()});
@@ -461,12 +461,16 @@ public:
ui64 oks = 0;
ui64 errors = 0;
ui64 subtestCount = 0;
+ ui64 actualDurationMs = 0; // i.e. time for RPS calculation
for (const auto& test: FinishedTests) {
oks += test.Report.GetOperationsOK();
errors += test.Report.GetOperationsError();
subtestCount += test.Report.GetSubtestCount();
+ actualDurationMs += test.Report.GetDurationMs();
}
+ size_t rps = oks * 1000 / actualDurationMs ? actualDurationMs : 1;
+
TIntrusivePtr<TEvLoad::TLoadReport> report(new TEvLoad::TLoadReport());
report->Duration = duration;
@@ -475,11 +479,49 @@ public:
value["oks"] = oks;
value["errors"] = errors;
value["subtests"] = subtestCount;
+ value["rps"] = rps;
+
+ TString configString;
+ google::protobuf::TextFormat::PrintToString(Request, &configString);
+
+#define PARAM(NAME, VALUE) \
+ TABLER() { \
+ TABLED() { str << NAME; } \
+ TABLED() { str << VALUE; } \
+ }
+
+ TStringStream str;
+ HTML(str) {
+ TABLE_CLASS("table table-condensed") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { str << "Parameter"; }
+ TABLEH() { str << "Value"; }
+ }
+ }
+ TABLEBODY() {
+ PARAM("Elapsed time total", duration.Seconds() << "s");
+ PARAM("Elapsed RPS time (all actors)", TDuration::MilliSeconds(actualDurationMs).Seconds() << "s");
+ PARAM("RPS", rps);
+ PARAM("OKs", oks);
+ PARAM("Errors", errors);
+ PARAM("Finished subactors", FinishedTests.size())
+ }
+ }
+ DIV() {
+ str << configString;
+ }
+ }
+
+#undef PARAM
auto finishEv = std::make_unique<TEvLoad::TEvLoadTestFinished>(Tag, report);
finishEv->JsonResult = std::move(value);
+ finishEv->LastHtmlPage = str.Str();
+
ctx.Send(Parent, finishEv.release());
- Die(ctx);
+
+ Stop(ctx);
}
void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
@@ -550,13 +592,13 @@ public:
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
<< " actor recieved PoisonPill, going to die with subactorsCount# " << LoadActors.size());
Stop(ctx);
}
void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
+ LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
<< " stopped with error: " << reason << ", killing subactorsCount# " << LoadActors.size());
ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason));
diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
index 93c307c0aa5..4d467bbe8d1 100644
--- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
+++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
@@ -26,6 +26,8 @@ namespace NKikimr::NDataShardLoad {
namespace {
+const ui64 RECONNECT_LIMIT = 10;
+
// TReadIteratorPoints
class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> {
@@ -37,7 +39,7 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> {
TActorId Pipe;
bool WasConnected = false;
- ui64 ReconnectLimit = 10;
+ ui64 ReconnectLimit = RECONNECT_LIMIT;
TInstant StartTs; // actor started to send requests
@@ -70,7 +72,7 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
<< " Bootstrap called, will read keys# " << Points.size());
Become(&TReadIteratorPoints::StateFunc);
@@ -86,8 +88,15 @@ public:
private:
void Connect(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
- << " Connect to# " << TabletId << " called");
+ if (ReconnectLimit != RECONNECT_LIMIT) {
+ LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " will reconnect to tablet# " << TabletId
+ << " retries left# " << (ReconnectLimit - 1));
+ } else {
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ << " will connect to tablet# " << TabletId);
+ }
+
--ReconnectLimit;
if (ReconnectLimit == 0) {
TStringStream ss;
@@ -100,7 +109,7 @@ private:
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
TEvTabletPipe::TEvClientConnected *msg = ev->Get();
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
<< " Handle TEvClientConnected called, Status# " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
@@ -181,7 +190,7 @@ private:
}
void StopWithError(const TActorContext& ctx, const TString& reason) {
- LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
<< ", stopped with error: " << reason);
ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason));
@@ -190,7 +199,7 @@ private:
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
<< " tablet recieved PoisonPill, going to die");
// TODO: cancel iterator
@@ -407,7 +416,7 @@ private:
auto* actor = CreateReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys);
StartedActors.emplace_back(ctx.Register(actor));
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back());
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back());
}
void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
@@ -451,7 +460,7 @@ private:
return StopWithError(ctx, TStringBuilder() << "TEvTestLoadFinished while in " << State);
case EState::ReadHeadPoints: {
Y_VERIFY(Inflight == 0);
- LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "headread with inflight# " << Inflights[InflightIndex]
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "headread with inflight# " << Inflights[InflightIndex]
<< " finished: " << ev->Get()->ToString());
Errors += record.GetReport().GetOperationsError();
Oks += record.GetReport().GetOperationsOK();
@@ -602,7 +611,7 @@ private:
}
void HandlePoison(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id
<< " tablet recieved PoisonPill, going to die");
Stop(ctx);
}
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
index 8be193bae64..c6bdcf0b214 100644
--- a/ydb/core/protos/datashard_load.proto
+++ b/ydb/core/protos/datashard_load.proto
@@ -29,7 +29,7 @@ message TEvYCSBTestLoadRequest {
optional bool Infinite = 4;
// only for bulk upsert
- optional uint32 BatchSize = 5;
+ optional uint32 BatchSize = 5 [default = 1];
}
message TReadStart {