diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-02-10 14:22:59 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-02-10 14:22:59 +0300 |
commit | 712a7a36204f16a88f8c57a0e3cc120f3e592718 (patch) | |
tree | 5ac4b0c8ebcca2b951845b9ac1444d571cbdaf6a | |
parent | c67ba9c337d27dcb784fd0df4ad522fe2b32f725 (diff) | |
download | ydb-712a7a36204f16a88f8c57a0e3cc120f3e592718.tar.gz |
PR from branch users/eivanov89/-ycsb-load-actors-fixes
properly stop DS load actor and provide HTML results
rework kqp select actor: reduce memory footptint
avoid huge memory consumption by ycsb upsert actors
cleanup logs
-rw-r--r-- | ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp | 124 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/kqp_select.cpp | 143 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/kqp_upsert.cpp | 10 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/test_load_actor.cpp | 56 | ||||
-rw-r--r-- | ydb/core/load_test/ycsb/test_load_read_iterator.cpp | 29 | ||||
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 2 |
6 files changed, 200 insertions, 164 deletions
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp index 574553c5148..ca95d2a1725 100644 --- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp +++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp @@ -21,6 +21,8 @@ using TUploadRowsRequestPtr = std::unique_ptr<TEvDataShard::TEvUploadRowsRequest namespace { +const ui64 RECONNECT_LIMIT = 10; + enum class ERequestType { UpsertBulk, UpsertLocalMkql, @@ -89,35 +91,24 @@ TUploadRequest GenerateMkqlRowRequest(ui64 /* tableId */, ui64 keyNum, const TSt return TUploadRequest(request.release()); } -TRequestsVector GenerateRequests( +TUploadRequest GenerateRequest( ui64 tableId, ui64 keyFrom, - ui64 n, - ui64 batchSize, // only bulk requests + ui64 batchSize, // only bulk requests, otherwise 1 ERequestType requestType, const TString& table) { - TRequestsVector requests; - requests.reserve(n); - - for (size_t i = keyFrom; i < keyFrom + n; ++i) { - switch (requestType) { - case ERequestType::UpsertBulk: { - auto keysLeft = keyFrom + n - i; - auto currentBatchSize = Max(Min(batchSize, keysLeft), ui64(1)); - requests.emplace_back(GenerateBulkRowRequest(tableId, i, currentBatchSize)); - break; - } - case ERequestType::UpsertLocalMkql: - requests.emplace_back(GenerateMkqlRowRequest(tableId, i, table)); - break; - default: - // should not happen, just for compiler - Y_FAIL("Unsupported request type"); - } + switch (requestType) { + case ERequestType::UpsertBulk: + return GenerateBulkRowRequest(tableId, keyFrom, batchSize); + break; + case ERequestType::UpsertLocalMkql: + return GenerateMkqlRowRequest(tableId, keyFrom, table); + break; + default: + // should not happen, just for compiler + Y_FAIL("Unsupported request type"); } - - return requests; } class TUpsertActor : public TActorBootstrapped<TUpsertActor> { @@ -130,10 +121,9 @@ class TUpsertActor : public TActorBootstrapped<TUpsertActor> { TActorId Pipe; bool WasConnected = false; - ui64 ReconnectLimit = 10; + ui64 ReconnectLimit = RECONNECT_LIMIT; - TRequestsVector Requests; - size_t CurrentRequest = 0; + size_t CurrentRow = 0; size_t Inflight = 0; TInstant StartTs; @@ -164,17 +154,7 @@ public: void Bootstrap(const TActorContext& ctx) { LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id - << " TUpsertActor Bootstrap called: " << ConfingString); - - // note that we generate all requests at once to send at max speed, i.e. - // do not mess with protobufs, strings, etc when send data - Requests = GenerateRequests( - Target.GetTableId(), - Config.GetKeyFrom(), - Config.GetRowCount(), - Config.GetBatchSize(), - RequestType, - Target.GetTableName()); + << " TUpsertActor Bootstrap called: " << ConfingString << " with type# " << int(RequestType)); Become(&TUpsertActor::StateFunc); Connect(ctx); @@ -182,7 +162,15 @@ public: private: void Connect(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Connect called"); + if (ReconnectLimit != RECONNECT_LIMIT) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TUpsertActor# " << Id + << " will reconnect to tablet# " << Target.GetTabletId() + << " retries left# " << (ReconnectLimit - 1)); + } else { + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TUpsertActor# " << Id + << " will connect to tablet# " << Target.GetTabletId()); + } + --ReconnectLimit; if (ReconnectLimit == 0) { TStringStream ss; @@ -195,7 +183,7 @@ private: void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { TEvTabletPipe::TEvClientConnected *msg = ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { @@ -221,42 +209,34 @@ private: } void SendRows(const TActorContext &ctx) { - while (Inflight < Config.GetInflight() && CurrentRequest < Requests.size()) { - const auto* request = Requests[CurrentRequest].get(); + while (Inflight < Config.GetInflight() && CurrentRow < Config.GetRowCount()) { + auto rowsLest = Config.GetRowCount() - CurrentRow; + auto batchSize = Min(size_t{Config.GetBatchSize()}, rowsLest); + + auto request = GenerateRequest( + Target.GetTableId(), + Config.GetKeyFrom() + CurrentRow, + batchSize, + RequestType, + Target.GetTableName()); + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TUpsertActor# " << Id - << " send request# " << CurrentRequest << ": " << request->ToString()); - - if (!Config.GetInfinite()) { - NTabletPipe::SendData(ctx, Pipe, Requests[CurrentRequest].release()); - } else { - switch (RequestType) { - case ERequestType::UpsertBulk: { - const auto& casted = static_cast<const TEvDataShard::TEvUploadRowsRequest*>(request); - auto requestCopy = std::make_unique<TEvDataShard::TEvUploadRowsRequest>(); - requestCopy->Record = casted->Record; - NTabletPipe::SendData(ctx, Pipe, requestCopy.release()); - break; - } case ERequestType::UpsertLocalMkql: { - const auto& casted = static_cast<const TEvTablet::TEvLocalMKQL*>(request); - auto requestCopy = std::make_unique<TEvTablet::TEvLocalMKQL>(); - requestCopy->Record = casted->Record; - NTabletPipe::SendData(ctx, Pipe, requestCopy.release()); - break; - } - } - } - - ++CurrentRequest; + << " sends request# " << CurrentRow << " with batchSize# " << batchSize + << ": " << request->ToString()); + + NTabletPipe::SendData(ctx, Pipe, request.release()); + + CurrentRow += batchSize; ++Inflight; } } void OnRequestDone(const TActorContext& ctx) { - if (Config.GetInfinite() && CurrentRequest >= Requests.size()) { - CurrentRequest = 0; + if (Config.GetInfinite() && CurrentRow >= Config.GetRowCount()) { + CurrentRow = 0; } - if (CurrentRequest < Requests.size()) { + if (CurrentRow < Config.GetRowCount()) { SendRows(ctx); } else if (Inflight == 0) { EndTs = TInstant::Now(); @@ -266,7 +246,7 @@ private: auto& report = *response->Record.MutableReport(); report.SetTag(Id.SubTag); report.SetDurationMs(delta.MilliSeconds()); - report.SetOperationsOK(Requests.size() - Errors); + report.SetOperationsOK(Config.GetRowCount() - Errors); report.SetOperationsError(Errors); ctx.Send(Parent, response.release()); @@ -315,10 +295,10 @@ private: TStringStream str; HTML(str) { str << "DS bulk upsert load actor# " << Id << " started on " << StartTs - << " sent " << CurrentRequest << " out of " << Requests.size(); + << " sent " << CurrentRow << " out of " << Config.GetRowCount(); TInstant ts = EndTs ? EndTs : TInstant::Now(); auto delta = ts - StartTs; - auto throughput = Requests.size() * 1000 / delta.MilliSeconds(); + auto throughput = Config.GetRowCount() * 1000 / (delta.MilliSeconds() ? delta.MilliSeconds() : 1); str << " in " << delta << " (" << throughput << " op/s)" << " errors=" << Errors; } @@ -327,13 +307,13 @@ private: } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet recieved PoisonPill, going to die"); + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet recieved PoisonPill, going to die"); NTabletPipe::CloseClient(SelfId(), Pipe); Die(ctx); } void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason); + LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "Load tablet stopped with error: " << reason); ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason)); NTabletPipe::CloseClient(SelfId(), Pipe); Die(ctx); diff --git a/ydb/core/load_test/ycsb/kqp_select.cpp b/ydb/core/load_test/ycsb/kqp_select.cpp index 42589476c16..ac4613c2d92 100644 --- a/ydb/core/load_test/ycsb/kqp_select.cpp +++ b/ydb/core/load_test/ycsb/kqp_select.cpp @@ -58,6 +58,27 @@ TQueryInfo GenerateSelect(const TString& table, const TString& key) { return TQueryInfo(str.Str(), std::move(params)); } +std::unique_ptr<NKqp::TEvKqp::TEvQueryRequest> GenerateSelectRequest(const TString& db, const TString& table, const TString& key) { + auto queryInfo = GenerateSelect(table, key); + + auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); + request->Record.MutableRequest()->SetKeepSession(true); + request->Record.MutableRequest()->SetDatabase(db); + + request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + request->Record.MutableRequest()->SetQuery(queryInfo.Query); + + request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + + const auto& params = NYdb::TProtoAccessor::GetProtoMap(queryInfo.Params); + request->Record.MutableRequest()->MutableYdbParameters()->insert(params.begin(), params.end()); + + return request; +} + // it's a partial copy-paste from TUpsertActor: logic slightly differs, so that // it seems better to have copy-paste rather if/else for different loads class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> { @@ -65,12 +86,17 @@ class TKqpSelectActor : public TActorBootstrapped<TKqpSelectActor> { const TActorId Parent; const TSubLoadId Id; const TString Database; - + const TString TableName; + const TVector<TString>& Keys; + const size_t FromKey; + size_t CurrentKey = 0; + const size_t ReadCount; + const bool Infinite; + + TActorId KqpProxyId; TString Session; - TRequestsVector Requests; - bool Infinite; - size_t CurrentRequest = 0; + size_t KeysRead = 0; TInstant StartTs; TInstant EndTs; @@ -82,76 +108,78 @@ public: const TActorId& parent, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TSubLoadId& id, - TRequestsVector requests, + const TVector<TString>& keys, + size_t fromKey, + size_t readCount, bool infinite) : Target(target) , Parent(parent) , Id(id) , Database(Target.GetWorkingDir()) - , Requests(std::move(requests)) + , TableName(Target.GetTableName()) + , Keys(keys) + , FromKey(fromKey) + , CurrentKey(fromKey) + , ReadCount(readCount) , Infinite(infinite) { Y_UNUSED(counters); } void Bootstrap(const TActorContext& ctx) { - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id << " Bootstrap called"); + KqpProxyId = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + Become(&TKqpSelectActor::StateFunc); CreateSession(ctx); } private: void CreateSession(const TActorContext& ctx) { - auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id - << " sends event for session creation to proxy: " << kqpProxy.ToString()); + << " sends event for session creation to proxy: " << KqpProxyId.ToString()); auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); ev->Record.MutableRequest()->SetDatabase(Database); - Send(kqpProxy, ev.Release()); + Send(KqpProxyId, ev.Release()); } void CloseSession(const TActorContext& ctx) { if (!Session) return; - auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id - << " sends session close query to proxy: " << kqpProxy); + << " sends session close query to proxy: " << KqpProxyId); auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); ev->Record.MutableRequest()->SetSessionId(Session); - ctx.Send(kqpProxy, ev.Release()); + ctx.Send(KqpProxyId, ev.Release()); } void ReadRow(const TActorContext &ctx) { - auto* request = static_cast<NKqp::TEvKqp::TEvQueryRequest*>(Requests[CurrentRequest].get()); + auto request = GenerateSelectRequest(Database, TableName, Keys[CurrentKey]); request->Record.MutableRequest()->SetSessionId(Session); - auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id - << " send request# " << CurrentRequest - << " to proxy# " << kqpProxy << ": " << request->ToString()); - - if (!Infinite) { - ctx.Send(kqpProxy, Requests[CurrentRequest].release()); - } else { - auto requestCopy = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); - requestCopy->Record = request->Record; - ctx.Send(kqpProxy, requestCopy.release()); - } + << " send request# " << CurrentKey + << " to proxy# " << KqpProxyId << ": " << request->ToString()); - ++CurrentRequest; + ctx.Send(KqpProxyId, request.release()); + ++CurrentKey; + ++KeysRead; } void OnRequestDone(const TActorContext& ctx) { - if (Infinite && CurrentRequest >= Requests.size()) { - CurrentRequest = 0; + if (Infinite && KeysRead == ReadCount) { + KeysRead = 0; + CurrentKey = FromKey; } - if (CurrentRequest < Requests.size()) { + CurrentKey = CurrentKey % Keys.size(); + + if (KeysRead < ReadCount) { ReadRow(ctx); } else { EndTs = TInstant::Now(); @@ -161,7 +189,7 @@ private: auto& report = *response->Record.MutableReport(); report.SetTag(Id.SubTag); report.SetDurationMs(delta.MilliSeconds()); - report.SetOperationsOK(Requests.size() - Errors); + report.SetOperationsOK(ReadCount - Errors); report.SetOperationsError(Errors); ctx.Send(Parent, response.release()); @@ -185,6 +213,7 @@ private: if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { Session = response.GetResponse().GetSessionId(); LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActor# " << Id << " session: " << Session); + StartTs = TInstant::Now(); ReadRow(ctx); } else { StopWithError(ctx, "failed to create session: " + ev->Get()->ToString()); @@ -213,10 +242,10 @@ private: TStringStream str; HTML(str) { str << "TKqpSelectActor# " << Id << " started on " << StartTs - << " sent " << CurrentRequest << " out of " << Requests.size(); + << " sent " << CurrentKey << " out of " << ReadCount; TInstant ts = EndTs ? EndTs : TInstant::Now(); auto delta = ts - StartTs; - auto throughput = Requests.size() / delta.Seconds(); + auto throughput = ReadCount * 1000 / (delta.MilliSeconds() ? delta.MilliSeconds() : 1); str << " in " << delta << " (" << throughput << " op/s)" << " errors=" << Errors; } @@ -252,7 +281,7 @@ class TKqpSelectActorMultiSession : public TActorBootstrapped<TKqpSelectActorMul TVector<ui32> KeyColumnIds; TVector<ui32> AllColumnIds; - TVector<TOwnedCellVec> Keys; + TVector<TString> Keys; ui64 LastReadId = 0; ui64 LastSubTag = 0; @@ -373,9 +402,15 @@ private: } void Handle(TEvPrivate::TEvKeys::TPtr& ev, const TActorContext& ctx) { - Keys = std::move(ev->Get()->Keys); - if (Keys.size() == 0) { - return StopWithError(ctx, "Failed to read keys"); + const auto& keyCells = ev->Get()->Keys; + if (keyCells.size() == 0) { + return StopWithError(ctx, "Failed to read keys or no keys"); + } + + Keys.reserve(keyCells.size()); + for (const auto& keyCell: keyCells) { + TStringBuf keyBuf = keyCell[0].AsBuf(); + Keys.emplace_back(keyBuf.data(), keyBuf.size()); } LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpSelectActorMultiSession# " << Id @@ -390,38 +425,6 @@ private: TVector<TRequestsVector> perActorRequests; perActorRequests.reserve(Inflight); - size_t keyCounter = 0; - for (size_t i = 0; i < Inflight; ++i) { - TRequestsVector requests; - - const auto& keyCell = Keys[keyCounter++ % Keys.size()][0]; - TStringBuf keyBuf = keyCell.AsBuf(); - TString key(keyBuf.data(), keyBuf.size()); - - requests.reserve(ReadCount); - for (size_t i = 0; i < ReadCount; ++i) { - auto queryInfo = GenerateSelect(Target.GetTableName(), key); - - auto request = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); - request->Record.MutableRequest()->SetKeepSession(true); - request->Record.MutableRequest()->SetDatabase(Database); - - request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - request->Record.MutableRequest()->SetQuery(queryInfo.Query); - - request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - - const auto& params = NYdb::TProtoAccessor::GetProtoMap(queryInfo.Params); - request->Record.MutableRequest()->MutableYdbParameters()->insert(params.begin(), params.end()); - - requests.emplace_back(std::move(request)); - } - perActorRequests.emplace_back(std::move(requests)); - } - StartTs = TInstant::Now(); Actors.reserve(Inflight); @@ -433,7 +436,9 @@ private: SelfId(), Counters, subId, - std::move(perActorRequests[i]), + Keys, + 0, // keyFrom + ReadCount, Config.GetInfinite()); Actors.emplace_back(ctx.Register(kqpActor)); } diff --git a/ydb/core/load_test/ycsb/kqp_upsert.cpp b/ydb/core/load_test/ycsb/kqp_upsert.cpp index 25dd625c99d..645994370cf 100644 --- a/ydb/core/load_test/ycsb/kqp_upsert.cpp +++ b/ydb/core/load_test/ycsb/kqp_upsert.cpp @@ -122,7 +122,7 @@ public: private: void CreateSession(const TActorContext& ctx) { auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " sends event for session creation to proxy: " << kqpProxy.ToString()); auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); @@ -135,7 +135,7 @@ private: return; auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActor# " << Id << " sends session close query to proxy: " << kqpProxy); auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); @@ -373,7 +373,7 @@ private: return; } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Id << " finished: " << ev->Get()->ToString()); + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "kqp# " << Id << " finished: " << ev->Get()->ToString()); Errors += record.GetReport().GetOperationsError(); Oks += record.GetReport().GetOperationsOK(); @@ -407,13 +407,13 @@ private: } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id << " tablet recieved PoisonPill, going to die"); Stop(ctx); } void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id + LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "TKqpUpsertActorMultiSession# " << Id << " stopped with error: " << reason); ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Id.SubTag, reason)); diff --git a/ydb/core/load_test/ycsb/test_load_actor.cpp b/ydb/core/load_test/ycsb/test_load_actor.cpp index b06494c758e..552cea721c9 100644 --- a/ydb/core/load_test/ycsb/test_load_actor.cpp +++ b/ydb/core/load_test/ycsb/test_load_actor.cpp @@ -105,7 +105,7 @@ public: void CreateSession(const TActorContext& ctx) { auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " sends event for session creation to proxy# " << kqpProxy.ToString()); auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); @@ -118,7 +118,7 @@ public: return; auto kqpProxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " sends session close query to proxy: " << kqpProxy); auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); @@ -131,7 +131,7 @@ public: if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { Session = response.GetResponse().GetSessionId(); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " session: " << Session); + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " session: " << Session); PrepareTable(ctx); } else { StopWithError(ctx, "failed to create session: " + ev->Get()->ToString()); @@ -443,7 +443,7 @@ public: return PrepareTable(ctx); } - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " received finished from actor# " << ev->Sender << " with tag# " << record.GetTag()); FinishedTests.push_back({record.GetTag(), record.GetErrorReason(), TAppData::TimeProvider->Now(), record.GetReport()}); @@ -461,12 +461,16 @@ public: ui64 oks = 0; ui64 errors = 0; ui64 subtestCount = 0; + ui64 actualDurationMs = 0; // i.e. time for RPS calculation for (const auto& test: FinishedTests) { oks += test.Report.GetOperationsOK(); errors += test.Report.GetOperationsError(); subtestCount += test.Report.GetSubtestCount(); + actualDurationMs += test.Report.GetDurationMs(); } + size_t rps = oks * 1000 / actualDurationMs ? actualDurationMs : 1; + TIntrusivePtr<TEvLoad::TLoadReport> report(new TEvLoad::TLoadReport()); report->Duration = duration; @@ -475,11 +479,49 @@ public: value["oks"] = oks; value["errors"] = errors; value["subtests"] = subtestCount; + value["rps"] = rps; + + TString configString; + google::protobuf::TextFormat::PrintToString(Request, &configString); + +#define PARAM(NAME, VALUE) \ + TABLER() { \ + TABLED() { str << NAME; } \ + TABLED() { str << VALUE; } \ + } + + TStringStream str; + HTML(str) { + TABLE_CLASS("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH() { str << "Parameter"; } + TABLEH() { str << "Value"; } + } + } + TABLEBODY() { + PARAM("Elapsed time total", duration.Seconds() << "s"); + PARAM("Elapsed RPS time (all actors)", TDuration::MilliSeconds(actualDurationMs).Seconds() << "s"); + PARAM("RPS", rps); + PARAM("OKs", oks); + PARAM("Errors", errors); + PARAM("Finished subactors", FinishedTests.size()) + } + } + DIV() { + str << configString; + } + } + +#undef PARAM auto finishEv = std::make_unique<TEvLoad::TEvLoadTestFinished>(Tag, report); finishEv->JsonResult = std::move(value); + finishEv->LastHtmlPage = str.Str(); + ctx.Send(Parent, finishEv.release()); - Die(ctx); + + Stop(ctx); } void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) { @@ -550,13 +592,13 @@ public: } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " actor recieved PoisonPill, going to die with subactorsCount# " << LoadActors.size()); Stop(ctx); } void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag + LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " stopped with error: " << reason << ", killing subactorsCount# " << LoadActors.size()); ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(Tag, reason)); diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp index 93c307c0aa5..4d467bbe8d1 100644 --- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp +++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp @@ -26,6 +26,8 @@ namespace NKikimr::NDataShardLoad { namespace { +const ui64 RECONNECT_LIMIT = 10; + // TReadIteratorPoints class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { @@ -37,7 +39,7 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { TActorId Pipe; bool WasConnected = false; - ui64 ReconnectLimit = 10; + ui64 ReconnectLimit = RECONNECT_LIMIT; TInstant StartTs; // actor started to send requests @@ -70,7 +72,7 @@ public: } void Bootstrap(const TActorContext& ctx) { - LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id << " Bootstrap called, will read keys# " << Points.size()); Become(&TReadIteratorPoints::StateFunc); @@ -86,8 +88,15 @@ public: private: void Connect(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id - << " Connect to# " << TabletId << " called"); + if (ReconnectLimit != RECONNECT_LIMIT) { + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " will reconnect to tablet# " << TabletId + << " retries left# " << (ReconnectLimit - 1)); + } else { + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + << " will connect to tablet# " << TabletId); + } + --ReconnectLimit; if (ReconnectLimit == 0) { TStringStream ss; @@ -100,7 +109,7 @@ private: void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) { TEvTabletPipe::TEvClientConnected *msg = ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + LOG_TRACE_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id << " Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { @@ -181,7 +190,7 @@ private: } void StopWithError(const TActorContext& ctx, const TString& reason) { - LOG_WARN_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + LOG_ERROR_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id << ", stopped with error: " << reason); ctx.Send(Parent, new TEvDataShardLoad::TEvTestLoadFinished(0, reason)); @@ -190,7 +199,7 @@ private: } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id << " tablet recieved PoisonPill, going to die"); // TODO: cancel iterator @@ -407,7 +416,7 @@ private: auto* actor = CreateReadIteratorScan(request.release(), TabletId, SelfId(), subId, sampleKeys); StartedActors.emplace_back(ctx.Register(actor)); - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back()); + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "started fullscan actor# " << StartedActors.back()); } void Handle(const TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { @@ -451,7 +460,7 @@ private: return StopWithError(ctx, TStringBuilder() << "TEvTestLoadFinished while in " << State); case EState::ReadHeadPoints: { Y_VERIFY(Inflight == 0); - LOG_NOTICE_S(ctx, NKikimrServices::DS_LOAD_TEST, "headread with inflight# " << Inflights[InflightIndex] + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "headread with inflight# " << Inflights[InflightIndex] << " finished: " << ev->Get()->ToString()); Errors += record.GetReport().GetOperationsError(); Oks += record.GetReport().GetOperationsOK(); @@ -602,7 +611,7 @@ private: } void HandlePoison(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorLoadScenario# " << Id << " tablet recieved PoisonPill, going to die"); Stop(ctx); } diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index 8be193bae64..c6bdcf0b214 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -29,7 +29,7 @@ message TEvYCSBTestLoadRequest { optional bool Infinite = 4; // only for bulk upsert - optional uint32 BatchSize = 5; + optional uint32 BatchSize = 5 [default = 1]; } message TReadStart { |