aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-12-07 19:29:27 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-12-07 19:29:27 +0300
commit84d19cc6ce1ae6fff8fc42894a5f63da956a0b1a (patch)
tree0500cffeb6f9ffc56b23dd0acb564d78968b4d6d
parent4af13ba0a6ae5aee5c9ebe4e0d5784aaf507923c (diff)
downloadydb-84d19cc6ce1ae6fff8fc42894a5f63da956a0b1a.tar.gz
Refactor TKqpLoadActor
-rw-r--r--ydb/core/blobstorage/testload/test_load_actor.cpp2
-rw-r--r--ydb/core/blobstorage/testload/test_load_actor.h2
-rw-r--r--ydb/core/blobstorage/testload/test_load_kqp.cpp136
3 files changed, 52 insertions, 88 deletions
diff --git a/ydb/core/blobstorage/testload/test_load_actor.cpp b/ydb/core/blobstorage/testload/test_load_actor.cpp
index 2ee91e81993..c05088b1ed2 100644
--- a/ydb/core/blobstorage/testload/test_load_actor.cpp
+++ b/ydb/core/blobstorage/testload/test_load_actor.cpp
@@ -234,7 +234,7 @@ public:
}
LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new Kqp load actor with tag# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateKqpWriterTestLoad(
+ LoadActors.emplace(tag, ctx.Register(CreateKqpLoadActor(
cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag)));
break;
}
diff --git a/ydb/core/blobstorage/testload/test_load_actor.h b/ydb/core/blobstorage/testload/test_load_actor.h
index 6539245d245..6110db08182 100644
--- a/ydb/core/blobstorage/testload/test_load_actor.h
+++ b/ydb/core/blobstorage/testload/test_load_actor.h
@@ -55,7 +55,7 @@ namespace NKikimr {
const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
ui64 index, ui64 tag);
- NActors::IActor *CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
+ NActors::IActor *CreateKqpLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
ui64 index, ui64 tag);
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp
index bbed7bec552..84d30180f18 100644
--- a/ydb/core/blobstorage/testload/test_load_kqp.cpp
+++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp
@@ -35,6 +35,32 @@ enum {
EvKqpWorkerResponse
};
+void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, const TString& session, const TString& workingDir) {
+ TString query_text = TString(q.Query);
+ auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+
+ request->Record.MutableRequest()->SetSessionId(session);
+ request->Record.MutableRequest()->SetKeepSession(true);
+ request->Record.MutableRequest()->SetDatabase(workingDir);
+
+ request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ request->Record.MutableRequest()->SetQuery(query_text);
+
+ request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
+ request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
+ request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC);
+
+ const auto& paramsMap = NYdb::TProtoAccessor::GetProtoMap(q.Params);
+ request->Record.MutableRequest()->MutableYdbParameters()->insert(paramsMap.begin(), paramsMap.end());
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+
+ ctx.Send(kqp_proxy, request.Release());
+};
+
struct MonitoringData {
public:
MonitoringData()
@@ -180,36 +206,12 @@ private:
queries.pop_front();
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
- << " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size());
+ << " session: " << WorkerSession << " query type: " << WorkloadType
+ << ", params size: " << q.Params.GetValues().size());
Transactions->Inc();
- TString query_text = TString(q.Query);
- auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
-
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " using session: " << WorkerSession);
-
- request->Record.MutableRequest()->SetSessionId(WorkerSession);
- request->Record.MutableRequest()->SetKeepSession(true);
- request->Record.MutableRequest()->SetDatabase(WorkingDir);
-
- request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
- request->Record.MutableRequest()->SetQuery(query_text);
-
- request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
- request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
-
- request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC);
-
- const auto& paramsMap = NYdb::TProtoAccessor::GetProtoMap(q.Params);
- request->Record.MutableRequest()->MutableYdbParameters()->insert(paramsMap.begin(), paramsMap.end());
-
- auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
-
- ctx.Send(kqp_proxy, request.Release());
-
+ SendQueryRequest(ctx, q, WorkerSession, WorkingDir);
}
void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
@@ -288,20 +290,17 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr Transactions;
NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten;
-
};
-class TKqpWriterTestLoadActor : public TActorBootstrapped<TKqpWriterTestLoadActor> {
+class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
public:
static constexpr auto ActorActivityType() {
return NKikimrServices::TActivity::KQP_TEST_WORKLOAD;
}
- TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
- const TActorId& parent,
- const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
- ui64 index,
- ui64 tag)
+ TKqpLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
+ const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
+ ui64 index, ui64 tag)
: Parent(parent)
, Tag(tag)
{
@@ -318,7 +317,7 @@ public:
WindowDuration = cmd.GetWindowDuration();
WindowCount = (DurationSeconds + WindowDuration - 1) / WindowDuration;
NumOfSessions = cmd.GetNumOfSessions();
- ChunkLoad.resize(WindowCount);
+ AnswersReceived.resize(WindowCount);
Chunk.reserve(WindowCount);
Total = std::make_unique<MonitoringData>();
for (size_t i = 0; i < WindowCount; ++i) {
@@ -364,13 +363,13 @@ public:
TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true);
}
- ~TKqpWriterTestLoadActor() {
+ ~TKqpLoadActor() {
LoadCounters->ResetCounters();
}
void Bootstrap(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called");
- Become(&TKqpWriterTestLoadActor::StateStart);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpLoadActor Bootstrap called");
+ Become(&TKqpLoadActor::StateStart);
if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) {
NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams());
@@ -429,9 +428,9 @@ private:
}
void StartDeathProcess(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor StartDeathProcess called");
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpLoadActor StartDeathProcess called");
- Become(&TKqpWriterTestLoadActor::StateEndOfWork);
+ Become(&TKqpLoadActor::StateEndOfWork);
if (DeleteTableOnFinish) {
DropTables(ctx);
@@ -509,28 +508,17 @@ private:
<< " Count: " << response->Data.WindowHist.GetTotalCount());
Chunk[response->Phase]->Add(response->Data);
- ChunkLoad[response->Phase] += 1;
+ AnswersReceived[response->Phase] += 1;
- if (ChunkLoad[Phase] == NumOfSessions) {
+ if (AnswersReceived[Phase] == NumOfSessions) {
Total->Add(*Chunk[Phase]);
- SendNewRowToParent(ctx);
+ UpdatePhase(ctx);
}
}
- void SendNewRowToParent(const TActorContext& ctx) {
+ void UpdatePhase(const TActorContext& ctx) {
Phase += 1;
- LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
- << " total: Phase: " << Phase << " -> "
- << Total->WindowHist.GetTotalCount() << " | "
- << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0) << " | "
- << Total->WindowErrors << " | "
- << Total->WindowHist.GetValueAtPercentile(50.0) / 1000.0 << " | "
- << Total->WindowHist.GetValueAtPercentile(95.0) / 1000.0 << " | "
- << Total->WindowHist.GetValueAtPercentile(99.0) / 1000.0 << " | "
- << Total->WindowHist.GetMax() / 1000.0
- );
-
if (Phase >= WindowCount) {
StartDeathProcess(ctx);
}
@@ -582,7 +570,7 @@ private:
auto& response = ev->Get()->Record.GetRef();
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
- Become(&TKqpWriterTestLoadActor::StateMain);
+ Become(&TKqpLoadActor::StateMain);
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created");
InitData = WorkloadQueryGen->GetInitialData();
InsertInitData(ctx);
@@ -602,38 +590,14 @@ private:
return;
}
+ Y_VERIFY(!InitData.empty());
auto q = std::move(InitData.front());
InitData.pop_front();
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
- << " Creating request for init query, need to exec: " << InitData.size() + 1);
-
- TString query_text = TString(q.Query);
-
- auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
-
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " using session: " << TableSession);
-
- request->Record.MutableRequest()->SetSessionId(TableSession);
- request->Record.MutableRequest()->SetKeepSession(true);
- request->Record.MutableRequest()->SetDatabase(WorkingDir);
-
- request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
- request->Record.MutableRequest()->SetQuery(query_text);
-
- request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
- request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
-
- request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC);
-
- const auto& paramsMap = NYdb::TProtoAccessor::GetProtoMap(q.Params);
- request->Record.MutableRequest()->MutableYdbParameters()->insert(paramsMap.begin(), paramsMap.end());
-
- auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ << " Creating request for init query, need to exec: " << InitData.size() + 1 << " session: " << TableSession);
- ctx.Send(kqp_proxy, request.Release());
+ SendQueryRequest(ctx, q, TableSession, WorkingDir);
}
void HandleDataQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
@@ -774,7 +738,7 @@ private:
// Monitoring
std::vector<std::unique_ptr<MonitoringData>> Chunk;
- std::vector<ui64> ChunkLoad;
+ std::vector<ui64> AnswersReceived;
std::unique_ptr<MonitoringData> Total;
ui64 Phase = 0;
@@ -785,9 +749,9 @@ private:
};
-IActor * CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
+IActor * CreateKqpLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) {
- return new TKqpWriterTestLoadActor(cmd, parent, counters, index, tag);
+ return new TKqpLoadActor(cmd, parent, counters, index, tag);
}
} // NKikimr