diff options
author | Sergei Puchin <s.puchin@gmail.com> | 2022-06-01 11:13:05 +0300 |
---|---|---|
committer | Sergei Puchin <s.puchin@gmail.com> | 2022-06-01 11:13:05 +0300 |
commit | 613651f1b6810fbee85dc4ddb7b3cc5a0a0d665e (patch) | |
tree | 8e63d9cd6dc1b89b46deaeb5849fe68dcbd426d8 | |
parent | 941504046e3c6a83de90275a5f72d2084812ff0f (diff) | |
download | ydb-613651f1b6810fbee85dc4ddb7b3cc5a0a0d665e.tar.gz |
Avoid redundant stats for KQP datashard transactions. (KIKIMR-14782)
ref:2ab1c43b8deceeda3ea010f3d393ea550d09535f
27 files changed, 168 insertions, 80 deletions
diff --git a/ydb/core/grpc_services/rpc_commit_transaction.cpp b/ydb/core/grpc_services/rpc_commit_transaction.cpp index 561bf1e2b1e..2286e4649ee 100644 --- a/ydb/core/grpc_services/rpc_commit_transaction.cpp +++ b/ydb/core/grpc_services/rpc_commit_transaction.cpp @@ -71,6 +71,7 @@ private: ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(req->tx_id()); ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); ev->Record.MutableRequest()->SetStatsMode(GetKqpStatsMode(req->collect_stats())); + ev->Record.MutableRequest()->SetCollectStats(req->collect_stats()); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); } diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 0f4a7072578..2a95665599e 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -116,6 +116,7 @@ public: ev->Record.MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control()); ev->Record.MutableRequest()->MutableQueryCachePolicy()->CopyFrom(req->query_cache_policy()); ev->Record.MutableRequest()->SetStatsMode(GetKqpStatsMode(req->collect_stats())); + ev->Record.MutableRequest()->SetCollectStats(req->collect_stats()); auto& query = req->query(); switch (req->query().query_case()) { diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp index 2650d32df50..2df34825eb1 100644 --- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp @@ -80,6 +80,7 @@ public: ev->Record.MutableRequest()->SetQuery(script); ev->Record.MutableRequest()->SetKeepSession(false); ev->Record.MutableRequest()->SetStatsMode(GetKqpStatsMode(req->collect_stats())); + ev->Record.MutableRequest()->SetCollectStats(req->collect_stats()); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); } diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index be73ea6fb3c..b7733485957 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -153,13 +153,13 @@ bool FillKqpRequest(const Ydb::Experimental::ExecuteStreamQueryRequest& req, NKi switch (req.profile_mode()) { case Ydb::Experimental::ExecuteStreamQueryRequest_ProfileMode_PROFILE_MODE_UNSPECIFIED: case Ydb::Experimental::ExecuteStreamQueryRequest_ProfileMode_NONE: - kqpRequest.MutableRequest()->SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_NONE); + kqpRequest.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE); break; case Ydb::Experimental::ExecuteStreamQueryRequest_ProfileMode_BASIC: - kqpRequest.MutableRequest()->SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_BASIC); + kqpRequest.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC); break; case Ydb::Experimental::ExecuteStreamQueryRequest_ProfileMode_FULL: - kqpRequest.MutableRequest()->SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_PROFILE); + kqpRequest.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL); break; default: YQL_ENSURE(false, "Unknown profile_mode " @@ -180,6 +180,7 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp:: case Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC: kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); kqpRequest.MutableRequest()->SetStatsMode(GetKqpStatsMode(req.collect_stats())); + kqpRequest.MutableRequest()->SetCollectStats(req.collect_stats()); break; case Ydb::Table::ExecuteScanQueryRequest::MODE_EXPLAIN: kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index 04bd7d78ae8..3a0697e0293 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -91,6 +91,7 @@ namespace { kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING); kqpRequest.MutableRequest()->SetStatsMode(GetKqpStatsMode(req.collect_stats())); + kqpRequest.MutableRequest()->SetCollectStats(req.collect_stats()); kqpRequest.MutableRequest()->SetKeepSession(false); kqpRequest.MutableRequest()->SetQuery(script); diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h index c3b167f5155..d15793fd1c6 100644 --- a/ydb/core/kqp/common/kqp_gateway.h +++ b/ydb/core/kqp/common/kqp_gateway.h @@ -117,7 +117,7 @@ public: ui64 TotalReadSizeLimitBytes = 0; ui64 MkqlMemoryLimit = 0; // old engine compatibility ui64 PerShardKeysSizeLimitBytes = 0; - NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::DQ_STATS_MODE_NONE; + Ydb::Table::QueryStatsCollection::Mode StatsMode = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; bool DisableLlvmForUdfStages = false; bool LlvmEnabled = true; TKqpSnapshot Snapshot = TKqpSnapshot(); @@ -131,7 +131,7 @@ public: }; struct TAstQuerySettings { - NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::DQ_STATS_MODE_NONE; + Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; }; public: diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index b0d4947f654..25a374d211f 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -1144,7 +1144,8 @@ private: } } - void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const TMaybe<ui64> lockTxId) + void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, + const TMaybe<ui64> lockTxId) { TShardState shardState; shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing; @@ -1156,7 +1157,7 @@ private: kqpTx.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); } kqpTx.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA); - kqpTx.MutableRuntimeSettings()->SetStatsMode(Request.StatsMode); + kqpTx.MutableRuntimeSettings()->SetStatsMode(GetDqStatsModeShard(Request.StatsMode)); kqpTx.MutableRuntimeSettings()->SetUseLLVM(false); kqpTx.MutableRuntimeSettings()->SetUseSpilling(false); @@ -1225,7 +1226,7 @@ private: //settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery; settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified; settings.FailOnUndelivery = true; - settings.StatsMode = Request.StatsMode; + settings.StatsMode = GetDqStatsMode(Request.StatsMode); settings.UseLLVM = false; settings.UseSpilling = false; @@ -1650,7 +1651,7 @@ private: Stats->ResultRows = response.GetResult().ResultsSize(); Stats->Finish(); - if (Request.StatsMode == NYql::NDqProto::DQ_STATS_MODE_PROFILE) { + if (CollectFullStats(Request.StatsMode)) { for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { const auto& tx = Request.Transactions[txId].Body; auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index 2a8818b2268..db5e0187f61 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -85,10 +85,8 @@ public: , Counters(counters) { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(); - if (Request.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC) { - Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, - ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); - } + Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, + ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); } void Bootstrap() { diff --git a/ydb/core/kqp/executer/kqp_executer_stats.cpp b/ydb/core/kqp/executer/kqp_executer_stats.cpp index 1119656f4b6..72ce31ce119 100644 --- a/ydb/core/kqp/executer/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer/kqp_executer_stats.cpp @@ -75,6 +75,44 @@ NDqProto::TDqTableAggrStats* GetOrCreateTableAggrStats(NDqProto::TDqStageStats* } // anonymous namespace +NYql::NDqProto::EDqStatsMode GetDqStatsMode(Ydb::Table::QueryStatsCollection::Mode mode) { + switch (mode) { + // Always collect basic stats for system views / request unit computation. + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE: + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC: + return NYql::NDqProto::DQ_STATS_MODE_BASIC; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL: + // TODO: Use DQ_STATS_MODE_PROFILE for STATS_COLLECTION_PROFILE mode (KIKIMR-15020) + return NYql::NDqProto::DQ_STATS_MODE_PROFILE; + default: + return NYql::NDqProto::DQ_STATS_MODE_NONE; + } +} + +NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollection::Mode mode) { + switch (mode) { + // Collect only minimal required stats to improve datashard performance. + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE: + return NYql::NDqProto::DQ_STATS_MODE_NONE; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC: + return NYql::NDqProto::DQ_STATS_MODE_BASIC; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL: + // TODO: Use DQ_STATS_MODE_PROFILE for STATS_COLLECTION_PROFILE mode (KIKIMR-15020) + return NYql::NDqProto::DQ_STATS_MODE_PROFILE; + default: + return NYql::NDqProto::DQ_STATS_MODE_NONE; + } +} + +bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode) { + return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL; +} + +bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) { + Y_UNUSED(statsMode); + return false; +} + void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProto::TDqComputeActorStats&& stats) { // Cerr << (TStringBuilder() << "::AddComputeActorStats " << stats.DebugString() << Endl); @@ -103,7 +141,7 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt } } - if (StatsMode >= NDqProto::DQ_STATS_MODE_PROFILE) { + if (CollectFullStats(StatsMode)) { for (auto& task : stats.GetTasks()) { auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); @@ -177,7 +215,7 @@ void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStat } } - if (StatsMode == NDqProto::DQ_STATS_MODE_PROFILE) { + if (CollectFullStats(StatsMode)) { for (auto& task : *stats.MutableTasks()) { auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result); @@ -242,7 +280,7 @@ void TQueryExecutionStats::Finish() { ExtraStats.SetAffectedShards(AffectedShards.size()); Result->MutableExtra()->PackFrom(ExtraStats); - if (StatsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE) { + if (CollectFullStats(StatsMode)) { Result->SetExecuterCpuTimeUs(ExecuterCpuTime.MicroSeconds()); Result->SetStartTimeMs(StartTs.MilliSeconds()); diff --git a/ydb/core/kqp/executer/kqp_executer_stats.h b/ydb/core/kqp/executer/kqp_executer_stats.h index 56c79213183..4c936207bca 100644 --- a/ydb/core/kqp/executer/kqp_executer_stats.h +++ b/ydb/core/kqp/executer/kqp_executer_stats.h @@ -7,8 +7,14 @@ namespace NKikimr { namespace NKqp { +NYql::NDqProto::EDqStatsMode GetDqStatsMode(Ydb::Table::QueryStatsCollection::Mode mode); +NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollection::Mode mode); + +bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode); +bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode); + struct TQueryExecutionStats { - const NYql::NDqProto::EDqStatsMode StatsMode; + const Ydb::Table::QueryStatsCollection::Mode StatsMode; const TKqpTasksGraph* const TasksGraph = nullptr; NYql::NDqProto::TDqExecutionStats* const Result; @@ -32,13 +38,12 @@ struct TQueryExecutionStats { TDuration ResolveWallTime; TVector<NKikimrQueryStats::TTxStats> DatashardStats; - TQueryExecutionStats(NYql::NDqProto::EDqStatsMode statsMode, const TKqpTasksGraph* const tasksGraph, + TQueryExecutionStats(Ydb::Table::QueryStatsCollection::Mode statsMode, const TKqpTasksGraph* const tasksGraph, NYql::NDqProto::TDqExecutionStats* const result) : StatsMode(statsMode) , TasksGraph(tasksGraph) , Result(result) { - YQL_ENSURE(StatsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC); } void AddComputeActorStats(ui32 nodeId, NYql::NDqProto::TDqComputeActorStats&& stats); diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp index 57f9c72af29..b5c1a34feaf 100644 --- a/ydb/core/kqp/executer/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp @@ -31,10 +31,11 @@ TDqTaskRunnerContext CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* c return context; } -TDqTaskRunnerSettings CreateTaskRunnerSettings(NDqProto::EDqStatsMode statsMode) { +TDqTaskRunnerSettings CreateTaskRunnerSettings(Ydb::Table::QueryStatsCollection::Mode statsMode) { TDqTaskRunnerSettings settings; - settings.CollectBasicStats = statsMode >= NDqProto::DQ_STATS_MODE_BASIC; - settings.CollectProfileStats = statsMode >= NDqProto::DQ_STATS_MODE_PROFILE; + // Always collect basic stats for system views / request unit computation. + settings.CollectBasicStats = true; + settings.CollectProfileStats = CollectProfileStats(statsMode); settings.OptLLVM = "OFF"; settings.TerminateOnError = false; settings.AllowGeneratorsInUnboxedValues = false; @@ -66,10 +67,8 @@ public: , Counters(counters) { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(); - if (Request.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { - Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, - ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); - } + Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, + ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); } void Bootstrap() { @@ -300,7 +299,7 @@ private: auto taskCpuTime = stats->BuildCpuTime + stats->ComputeCpuTime; executerCpuTime -= taskCpuTime; NYql::NDq::FillTaskRunnerStats(taskRunner->GetTaskId(), TaskId2StageId[taskRunner->GetTaskId()], - *stats, fakeComputeActorStats.AddTasks(), Request.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE); + *stats, fakeComputeActorStats.AddTasks(), CollectProfileStats(Request.StatsMode)); fakeComputeActorStats.SetCpuTimeUs(fakeComputeActorStats.GetCpuTimeUs() + taskCpuTime.MicroSeconds()); } @@ -315,7 +314,7 @@ private: Stats->Finish(); - if (Y_UNLIKELY(Request.StatsMode >= NDqProto::DQ_STATS_MODE_PROFILE)) { + if (Y_UNLIKELY(CollectFullStats(Request.StatsMode))) { for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { const auto& tx = Request.Transactions[txId].Body; auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp index 2622d761cb4..6c7b2068a3c 100644 --- a/ydb/core/kqp/executer/kqp_planner.cpp +++ b/ydb/core/kqp/executer/kqp_planner.cpp @@ -1,3 +1,4 @@ +#include "kqp_executer_stats.h" #include "kqp_planner.h" #include "kqp_planner_strategy.h" #include "kqp_shards_resolver.h" @@ -23,8 +24,9 @@ constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& tasks, THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, - const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const NYql::NDqProto::EDqStatsMode& statsMode, - bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath) + const TString& database, const TMaybe<TString>& userToken, TInstant deadline, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath) : TxId(txId) , ExecuterId(executer) , Tasks(std::move(tasks)) @@ -262,7 +264,7 @@ THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest( } ev->Record.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::SCAN); - ev->Record.MutableRuntimeSettings()->SetStatsMode(StatsMode); + ev->Record.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); ev->Record.MutableRuntimeSettings()->SetUseLLVM(withLLVM); ev->Record.MutableRuntimeSettings()->SetUseSpilling(WithSpilling); @@ -318,8 +320,9 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& tasks, THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, - const TString& database, const TMaybe<TString>& token, TInstant deadline, const NYql::NDqProto::EDqStatsMode& statsMode, - bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath) + const TString& database, const TMaybe<TString>& token, TInstant deadline, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath) { return new TKqpPlanner(txId, executer, std::move(tasks), std::move(scanTasks), snapshot, database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath); diff --git a/ydb/core/kqp/executer/kqp_planner.h b/ydb/core/kqp/executer/kqp_planner.h index dc5e993a74d..b2d2549c394 100644 --- a/ydb/core/kqp/executer/kqp_planner.h +++ b/ydb/core/kqp/executer/kqp_planner.h @@ -33,8 +33,9 @@ class TKqpPlanner : public TActorBootstrapped<TKqpPlanner> { public: TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, - const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const NYql::NDqProto::EDqStatsMode& statsMode, - bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath); + const TString& database, const TMaybe<TString>& userToken, TInstant deadline, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, + bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath); void Bootstrap(const TActorContext& ctx); @@ -63,7 +64,7 @@ private: TString Database; const TMaybe<TString> UserToken; const TInstant Deadline; - const NYql::NDqProto::EDqStatsMode StatsMode; + const Ydb::Table::QueryStatsCollection::Mode StatsMode; const bool DisableLlvmForUdfStages; const bool EnableLlvm; const bool WithSpilling; @@ -73,7 +74,8 @@ private: IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, - const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const NYql::NDqProto::EDqStatsMode& statsMode, - bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath); + const TString& database, const TMaybe<TString>& userToken, TInstant deadline, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index d871278c0a2..cf41f872452 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -952,7 +952,7 @@ private: Stats->FinishTs = TInstant::Now(); Stats->Finish(); - if (Request.StatsMode == NYql::NDqProto::DQ_STATS_MODE_PROFILE) { + if (CollectFullStats(Request.StatsMode)) { const auto& tx = Request.Transactions[0].Body; auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index b7e5089b48e..2dfbc5a1678 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -840,7 +840,7 @@ public: bool useScanQuery = ShouldUseScanQuery(dataQuery, settings); IKqpGateway::TAstQuerySettings querySettings; - querySettings.StatsMode = GetStatsMode(settings.StatsMode); + querySettings.CollectStats = GetStatsMode(settings.StatsMode); TFuture<TQueryResult> future; switch (queryType) { @@ -2247,14 +2247,14 @@ private: } // namespace -NDqProto::EDqStatsMode GetStatsMode(NYql::EKikimrStatsMode statsMode) { +Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode statsMode) { switch (statsMode) { case NYql::EKikimrStatsMode::Basic: - return NYql::NDqProto::DQ_STATS_MODE_BASIC; - case NYql::EKikimrStatsMode::Profile: - return NYql::NDqProto::DQ_STATS_MODE_PROFILE; + return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC; + case NYql::EKikimrStatsMode::Full: + return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL; default: - return NYql::NDqProto::DQ_STATS_MODE_NONE; + return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; } } diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h index ff4c8930989..51226338b30 100644 --- a/ydb/core/kqp/host/kqp_host_impl.h +++ b/ydb/core/kqp/host/kqp_host_impl.h @@ -8,7 +8,7 @@ namespace NKikimr { namespace NKqp { -NYql::NDqProto::EDqStatsMode GetStatsMode(NYql::EKikimrStatsMode statsMode); +Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode statsMode); template<typename TResult, bool copyIssues = true> class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult<TResult> { diff --git a/ydb/core/kqp/host/kqp_run_prepared.cpp b/ydb/core/kqp/host/kqp_run_prepared.cpp index 4f65bb63aac..b200da0f961 100644 --- a/ydb/core/kqp/host/kqp_run_prepared.cpp +++ b/ydb/core/kqp/host/kqp_run_prepared.cpp @@ -139,7 +139,7 @@ private: YQL_CLOG(TRACE, ProviderKqp) << "MiniKQL parameters:" << Endl << paramsTextBuilder; } - if (TransformCtx->QueryCtx->StatsMode == EKikimrStatsMode::Profile) { + if (TransformCtx->QueryCtx->StatsMode == EKikimrStatsMode::Full) { MkqlExecuteResult.Program = mkql.GetProgramText(); } diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp index dad550359de..f99e1333b39 100644 --- a/ydb/core/kqp/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/kqp_ic_gateway.cpp @@ -1677,7 +1677,7 @@ public: ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_SCAN); ev->Record.MutableRequest()->SetQuery(query); ev->Record.MutableRequest()->SetKeepSession(false); - ev->Record.MutableRequest()->SetStatsMode(settings.StatsMode); + ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats); if (!params.Values.empty()) { FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters()); @@ -1711,7 +1711,7 @@ public: ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_DML); ev->Record.MutableRequest()->SetQuery(query); ev->Record.MutableRequest()->SetKeepSession(false); - ev->Record.MutableRequest()->SetStatsMode(settings.StatsMode); + ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats); if (!params.Values.empty()) { FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters()); @@ -1751,7 +1751,7 @@ public: ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_SCAN); ev->Record.MutableRequest()->SetQuery(query); ev->Record.MutableRequest()->SetKeepSession(false); - ev->Record.MutableRequest()->SetStatsMode(settings.StatsMode); + ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats); if (!params.Values.empty()) { FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters()); @@ -1811,7 +1811,7 @@ public: ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_DML); ev->Record.MutableRequest()->SetQuery(query); ev->Record.MutableRequest()->SetKeepSession(false); - ev->Record.MutableRequest()->SetStatsMode(settings.StatsMode); + ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats); if (!params.Values.empty()) { FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters()); diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index e235ca3afe4..22b7662b162 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -97,14 +97,16 @@ struct TKqpCleanupCtx { TInstant Start = TInstant::Now(); }; -EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest, EKikimrStatsMode minMode) { - switch (queryRequest.GetStatsMode()) { - case NYql::NDqProto::DQ_STATS_MODE_BASIC: +EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest) { + switch (queryRequest.GetCollectStats()) { + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE: + return EKikimrStatsMode::None; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC: return EKikimrStatsMode::Basic; - case NYql::NDqProto::DQ_STATS_MODE_PROFILE: - return EKikimrStatsMode::Profile; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL: + return EKikimrStatsMode::Full; default: - return std::max(EKikimrStatsMode::None, minMode); + return EKikimrStatsMode::None; } } @@ -709,7 +711,7 @@ public: request.CancelAfter = cancelAt - now; } - EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request, EKikimrStatsMode::Basic); + EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request); request.StatsMode = GetStatsMode(statsMode); request.Snapshot = queryState->TxCtx->GetSnapshot(); @@ -735,7 +737,7 @@ public: request.Timeout = TDuration::MilliSeconds(1); } request.MaxComputeActors = Config->_KqpMaxComputeActors.Get().GetRef(); - EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request, EKikimrStatsMode::Basic); + EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request); request.StatsMode = GetStatsMode(statsMode); request.DisableLlvmForUdfStages = Config->DisableLlvmForUdfStages(); request.LlvmEnabled = Config->GetEnableLlvm() != EOptionalFlag::Disabled; @@ -1254,7 +1256,7 @@ public: ); } - bool reportStats = (GetStatsModeInt(queryRequest, EKikimrStatsMode::None) != EKikimrStatsMode::None); + bool reportStats = (GetStatsModeInt(queryRequest) != EKikimrStatsMode::None); if (reportStats) { FillQueryProfile(*stats, *response); response->SetQueryPlan(SerializeAnalyzePlan(*stats)); diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index 47dc15e97a3..f3429cb0d29 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -76,7 +76,6 @@ struct TKqpQueryState { TMaybe<NKikimrKqp::TRlPath> RlPath; }; - struct TKqpCleanupState { bool Final = false; TInstant Start; @@ -84,16 +83,24 @@ struct TKqpCleanupState { }; EKikimrStatsMode GetStatsMode(const NKikimrKqp::TQueryRequest& queryRequest, EKikimrStatsMode minMode) { - if (queryRequest.GetProfile()) { - // TODO: Deprecate, StatsMode is the new way to enable stats. - return EKikimrStatsMode::Profile; + if (queryRequest.HasCollectStats()) { + switch (queryRequest.GetCollectStats()) { + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE: + return EKikimrStatsMode::None; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC: + return EKikimrStatsMode::Basic; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL: + return EKikimrStatsMode::Full; + default: + return EKikimrStatsMode::None; + } } switch (queryRequest.GetStatsMode()) { case NYql::NDqProto::DQ_STATS_MODE_BASIC: return EKikimrStatsMode::Basic; case NYql::NDqProto::DQ_STATS_MODE_PROFILE: - return EKikimrStatsMode::Profile; + return EKikimrStatsMode::Full; default: return std::max(EKikimrStatsMode::None, minMode); } @@ -1657,11 +1664,7 @@ private: } bool reportStats = (GetStatsMode(queryRequest, EKikimrStatsMode::None) != EKikimrStatsMode::None); - if (reportStats) { - // TODO: For compatibility with old rpc handlers, deprecate. - FillQueryProfile(stats, *record.MutableResponse()); - record.MutableResponse()->MutableQueryStats()->Swap(&stats); record.MutableResponse()->SetQueryPlan(queryResult.QueryPlan); } diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 6a66de83567..8f9a1194e17 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1086,7 +1086,7 @@ private: auto profile = config->Profile.Get(cluster); if (profile && *profile) { // Do not disable profiling if it was enabled at request level - settings.StatsMode = EKikimrStatsMode::Profile; + settings.StatsMode = EKikimrStatsMode::Full; } asyncResult = runFunc(settings); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 357fed94607..647849c310c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -28,7 +28,7 @@ struct TKikimrQueryDeadlines { enum class EKikimrStatsMode { None = 0, Basic = 1, - Profile = 2 + Full = 2, }; class IKikimrQueryExecutor : public TThrRefBase { diff --git a/ydb/core/kqp/ut/kqp_stats_ut.cpp b/ydb/core/kqp/ut/kqp_stats_ut.cpp index 417ab0b719d..c9976951202 100644 --- a/ydb/core/kqp/ut/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/kqp_stats_ut.cpp @@ -300,6 +300,33 @@ Y_UNIT_TEST_QUAD(RequestUnitForSuccessExplicitPrepare, UseNewEngine, UseSessionA UNIT_ASSERT(result.GetConsumedRu() > 1); } +Y_UNIT_TEST_QUAD(RequestUnitForExecute, UseNewEngine, UseSessionActor) { + auto kikimr = KikimrRunnerEnableSessionActor(UseNewEngine && UseSessionActor); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto query = Q1_(R"( + SELECT COUNT(*) FROM TwoShard; + )"); + + auto settings = TExecDataQuerySettings() + .KeepInQueryCache(true) + .ReportCostInfo(true); + + // Cached/uncached executions + for (ui32 i = 0; i < 2; ++i) { + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + Cerr << "Consumed units: " << result.GetConsumedRu() << Endl; + UNIT_ASSERT(result.GetConsumedRu() > 1); + + auto ru = result.GetResponseMetadata().find(NYdb::YDB_CONSUMED_UNITS_HEADER); + UNIT_ASSERT(ru != result.GetResponseMetadata().end()); + UNIT_ASSERT(atoi(ru->second.c_str()) > 1); + } +} + } // suite } // namespace NKqp diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index a4fc4571041..b60b518aed3 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -91,7 +91,8 @@ message TQueryRequest { optional uint64 TimeoutMs = 17; optional Ydb.YqlInternal.ExecQuerySettings QuerySettings = 18; reserved 19; // (deprecated) StatsMode - optional NYql.NDqProto.EDqStatsMode StatsMode = 20; + optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated + optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21; } message TKqpPathIdProto { diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index fa794ce6f1d..7d0061c1262 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -695,7 +695,8 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(const NKikimrTxDataShard::T if (tx.HasRuntimeSettings() && tx.GetRuntimeSettings().HasStatsMode()) { auto statsMode = tx.GetRuntimeSettings().GetStatsMode(); - settings.CollectBasicStats = statsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC; + // Always collect basic stats for system views / request unit computation. + settings.CollectBasicStats = true; settings.CollectProfileStats = statsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE; } else { settings.CollectBasicStats = false; diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 344acc9903a..4bf6b7010a9 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -705,6 +705,7 @@ void KqpFillStats(TDataShard& dataShard, const NKqp::TKqpTasksRunner& tasksRunne auto tableInfo = dataShard.GetUserTables().begin(); // Directly use StatsMode instead of bool flag, too much is reported for STATS_COLLECTION_BASIC mode. + bool withBasicStats = statsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC; bool withProfileStats = statsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE; auto& computeActorStats = *result.Record.MutableComputeActorStats(); @@ -713,9 +714,8 @@ void KqpFillStats(TDataShard& dataShard, const NKqp::TKqpTasksRunner& tasksRunne ui64 maxFinishTimeMs = 0; for (auto& [taskId, taskStats] : tasksRunner.GetTasksStats()) { + // Always report statistics required for system views & request unit computation auto* protoTask = computeActorStats.AddTasks(); - auto stageId = tasksRunner.GetTask(taskId).GetStageId(); - NYql::NDq::FillTaskRunnerStats(taskId, stageId, *taskStats, protoTask, withProfileStats); auto taskTableStats = computeCtx.GetTaskCounters(taskId); @@ -727,6 +727,13 @@ void KqpFillStats(TDataShard& dataShard, const NKqp::TKqpTasksRunner& tasksRunne protoTable->SetWriteBytes(taskTableStats.UpdateRowBytes); protoTable->SetEraseRows(taskTableStats.NEraseRow); + if (!withBasicStats) { + continue; + } + + auto stageId = tasksRunner.GetTask(taskId).GetStageId(); + NYql::NDq::FillTaskRunnerStats(taskId, stageId, *taskStats, protoTask, withProfileStats); + minFirstRowTimeMs = std::min(minFirstRowTimeMs, protoTask->GetFirstRowTimeMs()); maxFinishTimeMs = std::max(maxFinishTimeMs, protoTask->GetFinishTimeMs()); @@ -735,7 +742,6 @@ void KqpFillStats(TDataShard& dataShard, const NKqp::TKqpTasksRunner& tasksRunne if (Y_UNLIKELY(withProfileStats)) { NKqpProto::TKqpShardTableExtraStats tableExtraStats; tableExtraStats.SetShardId(dataShard.TabletID()); - // tableExtraStats.SetShardCpuTimeUs(...); // TODO: take it from TTxStats protoTable->MutableExtra()->PackFrom(tableExtraStats); } } diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 7f885944124..dda3a8d8585 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -187,10 +187,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters()); auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode(); - if (statsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC) { - KqpFillTxStats(DataShard, dataTx->GetCounters(), *op->Result()); - KqpFillStats(DataShard, tasksRunner, computeCtx, statsMode, *op->Result()); - } + KqpFillStats(DataShard, tasksRunner, computeCtx, statsMode, *op->Result()); } catch (const TMemoryLimitExceededException&) { txc.NotEnoughMemory(); |