diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-07 19:29:27 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-07 19:29:27 +0300 |
commit | 84d19cc6ce1ae6fff8fc42894a5f63da956a0b1a (patch) | |
tree | 0500cffeb6f9ffc56b23dd0acb564d78968b4d6d | |
parent | 4af13ba0a6ae5aee5c9ebe4e0d5784aaf507923c (diff) | |
download | ydb-84d19cc6ce1ae6fff8fc42894a5f63da956a0b1a.tar.gz |
Refactor TKqpLoadActor
-rw-r--r-- | ydb/core/blobstorage/testload/test_load_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/testload/test_load_actor.h | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/testload/test_load_kqp.cpp | 136 |
3 files changed, 52 insertions, 88 deletions
diff --git a/ydb/core/blobstorage/testload/test_load_actor.cpp b/ydb/core/blobstorage/testload/test_load_actor.cpp index 2ee91e81993..c05088b1ed2 100644 --- a/ydb/core/blobstorage/testload/test_load_actor.cpp +++ b/ydb/core/blobstorage/testload/test_load_actor.cpp @@ -234,7 +234,7 @@ public: } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new Kqp load actor with tag# " << tag); - LoadActors.emplace(tag, ctx.Register(CreateKqpWriterTestLoad( + LoadActors.emplace(tag, ctx.Register(CreateKqpLoadActor( cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag))); break; } diff --git a/ydb/core/blobstorage/testload/test_load_actor.h b/ydb/core/blobstorage/testload/test_load_actor.h index 6539245d245..6110db08182 100644 --- a/ydb/core/blobstorage/testload/test_load_actor.h +++ b/ydb/core/blobstorage/testload/test_load_actor.h @@ -55,7 +55,7 @@ namespace NKikimr { const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag); - NActors::IActor *CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, + NActors::IActor *CreateKqpLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, const NActors::TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag); diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp index bbed7bec552..84d30180f18 100644 --- a/ydb/core/blobstorage/testload/test_load_kqp.cpp +++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp @@ -35,6 +35,32 @@ enum { EvKqpWorkerResponse }; +void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, const TString& session, const TString& workingDir) { + TString query_text = TString(q.Query); + auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + + request->Record.MutableRequest()->SetSessionId(session); + request->Record.MutableRequest()->SetKeepSession(true); + request->Record.MutableRequest()->SetDatabase(workingDir); + + request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + request->Record.MutableRequest()->SetQuery(query_text); + + request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + + request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC); + + const auto& paramsMap = NYdb::TProtoAccessor::GetProtoMap(q.Params); + request->Record.MutableRequest()->MutableYdbParameters()->insert(paramsMap.begin(), paramsMap.end()); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + + ctx.Send(kqp_proxy, request.Release()); +}; + struct MonitoringData { public: MonitoringData() @@ -180,36 +206,12 @@ private: queries.pop_front(); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag - << " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size()); + << " session: " << WorkerSession << " query type: " << WorkloadType + << ", params size: " << q.Params.GetValues().size()); Transactions->Inc(); - TString query_text = TString(q.Query); - auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " using session: " << WorkerSession); - - request->Record.MutableRequest()->SetSessionId(WorkerSession); - request->Record.MutableRequest()->SetKeepSession(true); - request->Record.MutableRequest()->SetDatabase(WorkingDir); - - request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - request->Record.MutableRequest()->SetQuery(query_text); - - request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - - request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC); - - const auto& paramsMap = NYdb::TProtoAccessor::GetProtoMap(q.Params); - request->Record.MutableRequest()->MutableYdbParameters()->insert(paramsMap.begin(), paramsMap.end()); - - auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - - ctx.Send(kqp_proxy, request.Release()); - + SendQueryRequest(ctx, q, WorkerSession, WorkingDir); } void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { @@ -288,20 +290,17 @@ private: NMonitoring::TDynamicCounters::TCounterPtr Transactions; NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten; - }; -class TKqpWriterTestLoadActor : public TActorBootstrapped<TKqpWriterTestLoadActor> { +class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> { public: static constexpr auto ActorActivityType() { return NKikimrServices::TActivity::KQP_TEST_WORKLOAD; } - TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, - const TActorId& parent, - const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, - ui64 index, - ui64 tag) + TKqpLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, + const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, + ui64 index, ui64 tag) : Parent(parent) , Tag(tag) { @@ -318,7 +317,7 @@ public: WindowDuration = cmd.GetWindowDuration(); WindowCount = (DurationSeconds + WindowDuration - 1) / WindowDuration; NumOfSessions = cmd.GetNumOfSessions(); - ChunkLoad.resize(WindowCount); + AnswersReceived.resize(WindowCount); Chunk.reserve(WindowCount); Total = std::make_unique<MonitoringData>(); for (size_t i = 0; i < WindowCount; ++i) { @@ -364,13 +363,13 @@ public: TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true); } - ~TKqpWriterTestLoadActor() { + ~TKqpLoadActor() { LoadCounters->ResetCounters(); } void Bootstrap(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called"); - Become(&TKqpWriterTestLoadActor::StateStart); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpLoadActor Bootstrap called"); + Become(&TKqpLoadActor::StateStart); if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) { NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams()); @@ -429,9 +428,9 @@ private: } void StartDeathProcess(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor StartDeathProcess called"); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpLoadActor StartDeathProcess called"); - Become(&TKqpWriterTestLoadActor::StateEndOfWork); + Become(&TKqpLoadActor::StateEndOfWork); if (DeleteTableOnFinish) { DropTables(ctx); @@ -509,28 +508,17 @@ private: << " Count: " << response->Data.WindowHist.GetTotalCount()); Chunk[response->Phase]->Add(response->Data); - ChunkLoad[response->Phase] += 1; + AnswersReceived[response->Phase] += 1; - if (ChunkLoad[Phase] == NumOfSessions) { + if (AnswersReceived[Phase] == NumOfSessions) { Total->Add(*Chunk[Phase]); - SendNewRowToParent(ctx); + UpdatePhase(ctx); } } - void SendNewRowToParent(const TActorContext& ctx) { + void UpdatePhase(const TActorContext& ctx) { Phase += 1; - LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag - << " total: Phase: " << Phase << " -> " - << Total->WindowHist.GetTotalCount() << " | " - << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0) << " | " - << Total->WindowErrors << " | " - << Total->WindowHist.GetValueAtPercentile(50.0) / 1000.0 << " | " - << Total->WindowHist.GetValueAtPercentile(95.0) / 1000.0 << " | " - << Total->WindowHist.GetValueAtPercentile(99.0) / 1000.0 << " | " - << Total->WindowHist.GetMax() / 1000.0 - ); - if (Phase >= WindowCount) { StartDeathProcess(ctx); } @@ -582,7 +570,7 @@ private: auto& response = ev->Get()->Record.GetRef(); if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { - Become(&TKqpWriterTestLoadActor::StateMain); + Become(&TKqpLoadActor::StateMain); LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created"); InitData = WorkloadQueryGen->GetInitialData(); InsertInitData(ctx); @@ -602,38 +590,14 @@ private: return; } + Y_VERIFY(!InitData.empty()); auto q = std::move(InitData.front()); InitData.pop_front(); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag - << " Creating request for init query, need to exec: " << InitData.size() + 1); - - TString query_text = TString(q.Query); - - auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " using session: " << TableSession); - - request->Record.MutableRequest()->SetSessionId(TableSession); - request->Record.MutableRequest()->SetKeepSession(true); - request->Record.MutableRequest()->SetDatabase(WorkingDir); - - request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - request->Record.MutableRequest()->SetQuery(query_text); - - request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - - request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC); - - const auto& paramsMap = NYdb::TProtoAccessor::GetProtoMap(q.Params); - request->Record.MutableRequest()->MutableYdbParameters()->insert(paramsMap.begin(), paramsMap.end()); - - auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + << " Creating request for init query, need to exec: " << InitData.size() + 1 << " session: " << TableSession); - ctx.Send(kqp_proxy, request.Release()); + SendQueryRequest(ctx, q, TableSession, WorkingDir); } void HandleDataQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { @@ -774,7 +738,7 @@ private: // Monitoring std::vector<std::unique_ptr<MonitoringData>> Chunk; - std::vector<ui64> ChunkLoad; + std::vector<ui64> AnswersReceived; std::unique_ptr<MonitoringData> Total; ui64 Phase = 0; @@ -785,9 +749,9 @@ private: }; -IActor * CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, +IActor * CreateKqpLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) { - return new TKqpWriterTestLoadActor(cmd, parent, counters, index, tag); + return new TKqpLoadActor(cmd, parent, counters, index, tag); } } // NKikimr |