diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-03 06:50:04 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-03 06:50:04 +0300 |
commit | f577e5f412ee2a58aa7121733735fc8dd5c0fa2e (patch) | |
tree | a1f6116371f06f87eb14845876f7276a21aa43a3 | |
parent | f58a59b3ab6a6bdbd3feae3007b14426b2bdd7ad (diff) | |
download | ydb-f577e5f412ee2a58aa7121733735fc8dd5c0fa2e.tar.gz |
automatic for llvm usage
41 files changed, 181 insertions, 125 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 23096d9ed5..38cfd658a3 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -279,11 +279,12 @@ private: if (status == Ydb::StatusIds::SUCCESS) { YQL_ENSURE(kqpResult.PreparingQuery); - if (Config->EnableLlvm.Get()) { - kqpResult.PreparingQuery->SetEnableLlvm(*Config->EnableLlvm.Get()); + { + auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>( + kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry); + preparedQueryHolder->MutableLlvmSettings().Fill(Config, Query.QueryType); + KqpCompileResult->PreparedQuery = preparedQueryHolder; } - KqpCompileResult->PreparedQuery = std::make_shared<const TPreparedQueryHolder>( - kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry); auto now = TInstant::Now(); auto duration = now - StartTime; diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 768440da92..e038a56550 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -76,7 +76,8 @@ public: TDqTaskRunnerSettings settings; settings.CollectBasicStats = RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_BASIC; settings.CollectProfileStats = RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE; - settings.OptLLVM = GetUseLLVM() ? "--compile-options=disable-opt" : "OFF"; + + settings.OptLLVM = (GetTask().HasUseLlvm() && GetTask().GetUseLlvm()) ? "--compile-options=disable-opt" : "OFF"; settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache(); for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) { diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 9faa89e6d1..80e5a5144d 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -194,7 +194,7 @@ void TKqpScanComputeActor::DoBootstrap() { NDq::TDqTaskRunnerSettings settings; settings.CollectBasicStats = GetStatsMode() >= NYql::NDqProto::DQ_STATS_MODE_BASIC; settings.CollectProfileStats = GetStatsMode() >= NYql::NDqProto::DQ_STATS_MODE_PROFILE; - settings.OptLLVM = TBase::GetUseLLVM() ? "--compile-options=disable-opt" : "OFF"; + settings.OptLLVM = (GetTask().HasUseLlvm() && GetTask().GetUseLlvm()) ? "--compile-options=disable-opt" : "OFF"; settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache(); for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index ffa91e888a..784988357b 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1533,7 +1533,6 @@ private: kqpTx.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA); kqpTx.MutableRuntimeSettings()->SetStatsMode(GetDqStatsModeShard(Request.StatsMode)); - kqpTx.MutableRuntimeSettings()->SetUseLLVM(false); kqpTx.MutableRuntimeSettings()->SetUseSpilling(false); NKikimrTxDataShard::TDataTransaction dataTransaction; @@ -1620,7 +1619,6 @@ private: settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified; settings.FailOnUndelivery = true; settings.StatsMode = GetDqStatsMode(Request.StatsMode); - settings.UseLLVM = false; settings.UseSpilling = false; TComputeMemoryLimits limits; @@ -2148,8 +2146,7 @@ private: } Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(), - Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, - Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), + Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(), ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */); auto err = Planner->PlanExecution(); if (err) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 1c8f754870..3a091b266e 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -85,7 +85,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory); + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery); std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral( IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index b0f517dab6..e102048564 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -91,7 +91,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction @@ -115,7 +115,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory)); case NKqpProto::TKqpPhyTx::TYPE_SCAN: - return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig); + return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory)); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 554ccd2045..6245131d49 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1098,7 +1098,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig); + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 60d0f6fdfc..8bb42587a0 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -306,6 +306,15 @@ void TQueryExecutionStats::Finish() { ExtraStats.SetAffectedShards(AffectedShards.size()); if (CollectProfileStats(StatsMode)) { + for (auto&& s : UseLlvmByStageId) { + for (auto&& pbs : *Result->MutableStages()) { + if (pbs.GetStageId() == s.first) { + pbs.SetUseLlvm(s.second); + break; + } + } + } + for (auto&& s : ShardsCountByNode) { for (auto&& pbs : *Result->MutableStages()) { if (pbs.GetStageId() == s.first) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index 521f4f1c8d..5cae2ad4aa 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -16,6 +16,7 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode); struct TQueryExecutionStats { private: std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode; + std::map<ui32, bool> UseLlvmByStageId; public: const Ydb::Table::QueryStatsCollection::Mode StatsMode; const TKqpTasksGraph* const TasksGraph = nullptr; @@ -53,6 +54,9 @@ public: void AddNodeShardsCount(const ui32 stageId, const ui32 nodeId, const ui32 shardsCount) { Y_VERIFY(ShardsCountByNode[stageId].emplace(nodeId, shardsCount).second); } + void SetUseLlvm(const ui32 stageId, const bool value) { + Y_VERIFY(UseLlvmByStageId.emplace(stageId, value).second); + } void AddDatashardPrepareStats(NKikimrQueryStats::TTxStats&& txStats); void AddDatashardStats(NYql::NDqProto::TDqComputeActorStats&& stats, NKikimrQueryStats::TTxStats&& txStats); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index e23a834f03..a5237139f7 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -55,7 +55,7 @@ constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; TKqpPlanner::TKqpPlanner(const TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, TVector<ui64>&& computeTasks, THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, - const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, @@ -69,8 +69,6 @@ TKqpPlanner::TKqpPlanner(const TKqpTasksGraph& graph, ui64 txId, const TActorId& , UserToken(userToken) , Deadline(deadline) , StatsMode(statsMode) - , DisableLlvmForUdfStages(disableLlvmForUdfStages) - , EnableLlvm(enableLlvm) , WithSpilling(withSpilling) , RlPath(rlPath) , ResourcesSnapshot(std::move(resourcesSnapshot)) @@ -147,28 +145,19 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); } - bool enableLlvm = EnableLlvm; - for (ui64 taskId : requestData.TaskIds) { const auto& task = TasksGraph.GetTask(taskId); auto serializedTask = SerializeTaskToProto(TasksGraph, task); - if (DisableLlvmForUdfStages && serializedTask.GetProgram().GetSettings().GetHasUdf()) { - enableLlvm = false; - } request.AddTasks()->Swap(&serializedTask); } + request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); + request.SetStartAllOrFail(true); if (IsDataQuery) { request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA); - request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); - request.MutableRuntimeSettings()->SetUseLLVM(false); - request.SetStartAllOrFail(true); } else { request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN); - request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); - request.MutableRuntimeSettings()->SetUseLLVM(enableLlvm); request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling); - request.SetStartAllOrFail(true); } if (RlPath) { @@ -362,13 +351,13 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { std::unique_ptr<TKqpPlanner> CreateKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, - const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, bool isDataQuery) { return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot, - database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan, + database, userToken, deadline, statsMode, withSpilling, rlPath, executerSpan, std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery); } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 1f5766eea8..8505d782d2 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -37,8 +37,8 @@ public: TKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, - const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, - bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, bool isDataQuery); @@ -66,8 +66,6 @@ private: const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; const TInstant Deadline; const Ydb::Table::QueryStatsCollection::Mode StatsMode; - const bool DisableLlvmForUdfStages; - const bool EnableLlvm; const bool WithSpilling; const TMaybe<NKikimrKqp::TRlPath> RlPath; THashSet<ui32> TrackingNodes; @@ -84,7 +82,7 @@ private: std::unique_ptr<TKqpPlanner> CreateKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, - const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 438354e7b9..9c4a5c2708 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -50,7 +50,7 @@ TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpRead class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> { using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>; - + TPreparedQueryHolder::TConstPtr PreparedQuery; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR; @@ -59,8 +59,9 @@ public: TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, TWilsonKqp::ScanExecuter, "ScanExecuter") + , PreparedQuery(preparedQuery) , AggregationSettings(aggregation) { YQL_ENSURE(Request.Transactions.size() == 1); @@ -537,6 +538,19 @@ private: YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind); } + { + const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); + const bool useLlvm = PreparedQuery ? PreparedQuery->GetLlvmSettings().GetUseLlvm(stage.GetProgram().GetSettings()) : false; + for (auto& taskId : stageInfo.Tasks) { + auto& task = TasksGraph.GetTask(taskId); + task.SetUseLlvm(useLlvm); + } + if (Stats && CollectProfileStats(Request.StatsMode)) { + Stats->SetUseLlvm(stageInfo.Id.StageId, useLlvm); + } + + } + if (stage.GetIsSinglePartition()) { YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage"); } @@ -665,8 +679,7 @@ private: Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), std::move(computeTasks), std::move(tasksPerNode), GetSnapshot(), - Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, - Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, + Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */); LOG_D("Execute scan tx, PendingComputeTasks: " << PendingComputeTasks.size()); auto err = Planner->PlanExecution(); @@ -723,9 +736,9 @@ private: IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery) { - return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig); + return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index f3077c54e5..7d2fcda322 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -980,6 +980,7 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, c ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId()); result.SetId(task.Id); result.SetStageId(stageInfo.Id.StageId); + result.SetUseLlvm(task.GetUseLlvm()); if (task.HasMetaId()) { result.SetMetaId(task.GetMetaIdUnsafe()); } diff --git a/ydb/core/kqp/gateway/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/gateway/CMakeLists.darwin-x86_64.txt index 1f61f525b3..4040c20d8f 100644 --- a/ydb/core/kqp/gateway/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/gateway/CMakeLists.darwin-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(core-kqp-gateway PUBLIC providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp ) diff --git a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt index d863010351..f890eae46d 100644 --- a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt @@ -24,6 +24,7 @@ target_link_libraries(core-kqp-gateway PUBLIC providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp ) diff --git a/ydb/core/kqp/gateway/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/gateway/CMakeLists.linux-x86_64.txt index d863010351..f890eae46d 100644 --- a/ydb/core/kqp/gateway/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/gateway/CMakeLists.linux-x86_64.txt @@ -24,6 +24,7 @@ target_link_libraries(core-kqp-gateway PUBLIC providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp ) diff --git a/ydb/core/kqp/gateway/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/gateway/CMakeLists.windows-x86_64.txt index 1f61f525b3..4040c20d8f 100644 --- a/ydb/core/kqp/gateway/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/gateway/CMakeLists.windows-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(core-kqp-gateway PUBLIC providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp ) diff --git a/ydb/core/kqp/gateway/kqp_gateway.cpp b/ydb/core/kqp/gateway/kqp_gateway.cpp new file mode 100644 index 0000000000..d859c39934 --- /dev/null +++ b/ydb/core/kqp/gateway/kqp_gateway.cpp @@ -0,0 +1,5 @@ +#include "kqp_gateway.h" +#include <ydb/core/kqp/common/kqp.h> +namespace NKikimr::NKqp { + +} diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index e814922e0d..c196a6c1d6 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -10,6 +10,7 @@ #include <ydb/core/kqp/provider/yql_kikimr_gateway.h> #include <ydb/core/kqp/provider/yql_kikimr_settings.h> #include <ydb/core/tx/long_tx_service/public/lock_handle.h> +#include <ydb/library/accessor/accessor.h> #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/dq/common/dq_value.h> @@ -18,8 +19,7 @@ #include <library/cpp/lwtrace/shuttle.h> #include <library/cpp/protobuf/util/pb_io.h> -namespace NKikimr { -namespace NKqp { +namespace NKikimr::NKqp { const TStringBuf ParamNamePrefix = "%kqp%"; @@ -102,6 +102,7 @@ public: }; struct TExecPhysicalRequest : private TMoveOnly { + public: TExecPhysicalRequest(NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) : TxAlloc(txAlloc) @@ -123,8 +124,6 @@ public: ui64 MkqlMemoryLimit = 0; // old engine compatibility ui64 PerShardKeysSizeLimitBytes = 0; Ydb::Table::QueryStatsCollection::Mode StatsMode = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; - bool DisableLlvmForUdfStages = false; - bool LlvmEnabled = true; TKqpSnapshot Snapshot = TKqpSnapshot(); NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; TMaybe<NKikimrKqp::TRlPath> RlPath; @@ -176,8 +175,7 @@ TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const T std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters); -} // namespace NKqp -} // namespace NKikimr +} // namespace NKikimr::NKqp template<> struct THash<NKikimr::NKqp::IKqpGateway::TKqpSnapshot> { diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 173bc63814..82e143790c 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -362,7 +362,6 @@ private: runtimeSettingsBase.FailOnUndelivery = msgRtSettings.GetExecType() != NYql::NDqProto::TComputeRuntimeSettings::SCAN; runtimeSettingsBase.StatsMode = msgRtSettings.GetStatsMode(); - runtimeSettingsBase.UseLLVM = msgRtSettings.GetUseLLVM(); runtimeSettingsBase.UseSpilling = msgRtSettings.GetUseSpilling(); if (msgRtSettings.HasRlPath()) { diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index c523a703ae..ef3638024f 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -1702,6 +1702,11 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD if (auto stageGuid = node.GetMapSafe().FindPtr("StageGuid")) { if (auto stat = stages.FindPtr(stageGuid->GetStringSafe())) { auto& stats = node["Stats"]; + if ((*stat)->HasUseLlvm()) { + stats["UseLlvm"] = (*stat)->GetUseLlvm(); + } else { + stats["UseLlvm"] = "undefined"; + } stats["TotalTasks"] = (*stat)->GetTotalTasksCount(); stats["TotalDurationMs"] = (*stat)->GetFinishTimeMs().GetMax() - (*stat)->GetFirstRowTimeMs().GetMin(); @@ -1719,7 +1724,6 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD nodeInfo.InsertValue("shards_count", i.GetShardsCount()); nodeInfo.InsertValue("node_id", i.GetNodeId()); } - } for (auto& caStats : (*stat)->GetComputeActors()) { diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index 4bf25f7e7b..4f418f5bfe 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -49,7 +49,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, _DefaultCluster); REGISTER_SETTING(*this, _ResultRowsLimit); REGISTER_SETTING(*this, EnableSystemColumns); - REGISTER_SETTING(*this, EnableLlvm); + REGISTER_SETTING(*this, UseLlvm); REGISTER_SETTING(*this, OptDisableJoinRewrite); REGISTER_SETTING(*this, OptDisableJoinTableLookup); @@ -126,8 +126,8 @@ EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const { return GetOptionalFlagValue(OptEnablePredicateExtract.Get()); } -EOptionalFlag TKikimrSettings::GetEnableLlvm() const { - return GetOptionalFlagValue(EnableLlvm.Get()); +EOptionalFlag TKikimrSettings::GetUseLlvm() const { + return GetOptionalFlagValue(UseLlvm.Get()); } TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const { diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 08127e4c3d..ce461a9c1f 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -39,7 +39,7 @@ struct TKikimrSettings { NCommon::TConfSetting<TString, false> _DefaultCluster; NCommon::TConfSetting<ui32, false> _ResultRowsLimit; NCommon::TConfSetting<bool, false> EnableSystemColumns; - NCommon::TConfSetting<bool, false> EnableLlvm; + NCommon::TConfSetting<bool, false> UseLlvm; /* Disable optimizer rules */ NCommon::TConfSetting<bool, false> OptDisableJoinRewrite; @@ -72,7 +72,7 @@ struct TKikimrSettings { bool HasOptEnableOlapPushdown() const; bool HasOptUseFinalizeByKey() const; EOptionalFlag GetOptPredicateExtract() const; - EOptionalFlag GetEnableLlvm() const; + EOptionalFlag GetUseLlvm() const; // WARNING: For testing purposes only, inplace update is not ready for production usage. bool HasOptEnableInplaceUpdate() const; diff --git a/ydb/core/kqp/query_data/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/query_data/CMakeLists.darwin-x86_64.txt index bf9b5d0202..48ebb78260 100644 --- a/ydb/core/kqp/query_data/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/query_data/CMakeLists.darwin-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-query_data PUBLIC cpp-actors-core ydb-core-actorlib_impl ydb-core-base + kqp-common-simple yql-dq-expr_nodes yql-dq-proto core-kqp-expr_nodes diff --git a/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt index a37be07214..b0872af254 100644 --- a/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt @@ -18,6 +18,7 @@ target_link_libraries(core-kqp-query_data PUBLIC cpp-actors-core ydb-core-actorlib_impl ydb-core-base + kqp-common-simple yql-dq-expr_nodes yql-dq-proto core-kqp-expr_nodes diff --git a/ydb/core/kqp/query_data/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/query_data/CMakeLists.linux-x86_64.txt index a37be07214..b0872af254 100644 --- a/ydb/core/kqp/query_data/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/query_data/CMakeLists.linux-x86_64.txt @@ -18,6 +18,7 @@ target_link_libraries(core-kqp-query_data PUBLIC cpp-actors-core ydb-core-actorlib_impl ydb-core-base + kqp-common-simple yql-dq-expr_nodes yql-dq-proto core-kqp-expr_nodes diff --git a/ydb/core/kqp/query_data/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/query_data/CMakeLists.windows-x86_64.txt index bf9b5d0202..48ebb78260 100644 --- a/ydb/core/kqp/query_data/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/query_data/CMakeLists.windows-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-query_data PUBLIC cpp-actors-core ydb-core-actorlib_impl ydb-core-base + kqp-common-simple yql-dq-expr_nodes yql-dq-proto core-kqp-expr_nodes diff --git a/ydb/core/kqp/query_data/kqp_predictor.cpp b/ydb/core/kqp/query_data/kqp_predictor.cpp index cd743b89ee..d5696e3eff 100644 --- a/ydb/core/kqp/query_data/kqp_predictor.cpp +++ b/ydb/core/kqp/query_data/kqp_predictor.cpp @@ -37,16 +37,17 @@ void TStagePredictor::Prepare() { void TStagePredictor::Scan(const NYql::TExprNode::TPtr& stageNode) { NYql::VisitExpr(stageNode, [&](const NYql::TExprNode::TPtr& exprNode) { NYql::NNodes::TExprBase node(exprNode); - if (node.Maybe<NYql::NNodes::TKqpWideReadTable>()) { + ++NodesCount; + if (node.Maybe<NYql::NNodes::TCoCondense>() || node.Ref().Content() == "WideCondense1" || node.Maybe<NYql::NNodes::TCoCondense1>()) { + HasCondenseFlag = true; + } else if (node.Maybe<NYql::NNodes::TKqpWideReadTable>()) { HasRangeScanFlag = true; } else if (node.Maybe<NYql::NNodes::TKqpLookupTable>()) { HasLookupFlag = true; } else if (node.Maybe<NYql::NNodes::TKqpUpsertRows>()) { } else if (node.Maybe<NYql::NNodes::TKqpDeleteRows>()) { - } else if (node.Maybe<NYql::NNodes::TKqpWideReadTableRanges>()) { - HasRangeScanFlag = true; - } else if (node.Maybe<NYql::NNodes::TKqpWideReadOlapTableRanges>()) { + } else if (node.Maybe<NYql::NNodes::TKqpWideReadTableRanges>() || node.Maybe<NYql::NNodes::TKqpWideReadOlapTableRanges>()) { HasRangeScanFlag = true; } else if (node.Maybe<NYql::NNodes::TCoSort>()) { HasSortFlag = true; @@ -89,7 +90,9 @@ void TStagePredictor::SerializeToKqpSettings(NYql::NDqProto::TProgram::TSettings kqpProto.SetHasFilter(HasFilterFlag); kqpProto.SetHasTop(HasTopFlag); kqpProto.SetHasRangeScan(HasRangeScanFlag); + kqpProto.SetHasCondense(HasCondenseFlag); kqpProto.SetHasLookup(HasLookupFlag); + kqpProto.SetNodesCount(NodesCount); kqpProto.SetInputDataPrediction(InputDataPrediction); kqpProto.SetOutputDataPrediction(OutputDataPrediction); kqpProto.SetStageLevel(StageLevel); @@ -106,7 +109,9 @@ bool TStagePredictor::DeserializeFromKqpSettings(const NYql::NDqProto::TProgram: HasFilterFlag = kqpProto.GetHasFilter(); HasTopFlag = kqpProto.GetHasTop(); HasRangeScanFlag = kqpProto.GetHasRangeScan(); + HasCondenseFlag = kqpProto.GetHasCondense(); HasLookupFlag = kqpProto.GetHasLookup(); + NodesCount = kqpProto.GetNodesCount(); InputDataPrediction = kqpProto.GetInputDataPrediction(); OutputDataPrediction = kqpProto.GetOutputDataPrediction(); StageLevel = kqpProto.GetStageLevel(); @@ -140,6 +145,10 @@ ui32 TStagePredictor::CalcTasksOptimalCount(const ui32 availableThreadsCount, co return std::max<ui32>(1, result); } +bool TStagePredictor::NeedLLVM() const { + return HasStateCombinerFlag || HasFinalCombinerFlag || HasCondenseFlag; +} + TStagePredictor& TRequestPredictor::BuildForStage(const NYql::NNodes::TDqPhyStage& stage, NYql::TExprContext& ctx) { StagePredictors.emplace_back(); TStagePredictor& result = StagePredictors.back(); diff --git a/ydb/core/kqp/query_data/kqp_predictor.h b/ydb/core/kqp/query_data/kqp_predictor.h index 8195583876..7968fc3be9 100644 --- a/ydb/core/kqp/query_data/kqp_predictor.h +++ b/ydb/core/kqp/query_data/kqp_predictor.h @@ -10,6 +10,9 @@ private: YDB_READONLY_FLAG(HasStateCombiner, false); YDB_READONLY(ui32, GroupByKeysCount, 0); + YDB_READONLY_FLAG(HasCondense, false); + YDB_READONLY(ui32, NodesCount, 0); + YDB_READONLY_FLAG(HasSort, false); YDB_READONLY_FLAG(HasMapJoin, false); YDB_READONLY_FLAG(HasUdf, false); @@ -31,7 +34,7 @@ public: void SerializeToKqpSettings(NYql::NDqProto::TProgram::TSettings& kqpProto) const; bool DeserializeFromKqpSettings(const NYql::NDqProto::TProgram::TSettings& kqpProto); static ui32 GetUsableThreads(); - + bool NeedLLVM() const; ui32 CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const; }; diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index a0ac81cea0..2177256163 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/minikql/mkql_function_registry.h> #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/mkql_proto/mkql_proto.h> +#include <ydb/core/kqp/common/simple/helpers.h> #include <ydb/core/protos/kqp_physical.pb.h> #include <ydb/core/protos/services.pb.h> @@ -151,6 +152,29 @@ TString TPreparedQueryHolder::GetText() const { return Proto->GetText(); } +void TLlvmSettings::Fill(NYql::TKikimrConfiguration::TPtr config, const NKikimrKqp::EQueryType qType) { + DisableLlvmForUdfStages = config->DisableLlvmForUdfStages(); + if (config->GetUseLlvm() == NYql::EOptionalFlag::Disabled) { + UseLlvmExternalDirective = false; + } else if (config->GetUseLlvm() == NYql::EOptionalFlag::Enabled) { + UseLlvmExternalDirective = true; + } + if (!IsSqlQuery(qType)) { + UseLlvmExternalDirective = false; + } +} + +bool TLlvmSettings::GetUseLlvm(const NYql::NDqProto::TProgram::TSettings& kqpSettingsProto) const { + TStagePredictor stagePredictor; + stagePredictor.DeserializeFromKqpSettings(kqpSettingsProto); + if (DisableLlvmForUdfStages && stagePredictor.IsHasUdf()) { + return false; + } else if (UseLlvmExternalDirective) { + return *UseLlvmExternalDirective; + } else { + return stagePredictor.NeedLLVM(); + } +} } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h index f1a01bc091..42dc3cc7de 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.h +++ b/ydb/core/kqp/query_data/kqp_prepared_query.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/kqp/query_data/kqp_predictor.h> +#include <ydb/core/kqp/provider/yql_kikimr_settings.h> #include <ydb/core/protos/kqp.pb.h> #include <util/generic/vector.h> @@ -94,7 +95,19 @@ public: bool IsLiteralTx() const; }; +class TLlvmSettings { +private: + YDB_READONLY(bool, DisableLlvmForUdfStages, false); + YDB_READONLY_DEF(std::optional<bool>, UseLlvmExternalDirective); +public: + void Fill(NYql::TKikimrConfiguration::TPtr config, const NKikimrKqp::EQueryType qType); + + bool GetUseLlvm(const NYql::NDqProto::TProgram::TSettings& kqpSettingsProto) const; +}; + class TPreparedQueryHolder { +private: + YDB_ACCESSOR_DEF(TLlvmSettings, LlvmSettings); std::shared_ptr<const NKikimrKqp::TPreparedQuery> Proto; std::shared_ptr<TPreparedQueryAllocHolder> Alloc; TVector<TString> QueryTables; @@ -143,14 +156,6 @@ public: const NKqpProto::TKqpPhyQuery& GetPhysicalQuery() const { return Proto->GetPhysicalQuery(); } - - std::optional<bool> GetEnableLlvm() const { - if (Proto->HasEnableLlvm()) { - return Proto->GetEnableLlvm(); - } else { - return std::nullopt; - } - } }; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 4243fe96ca..44d20ad0f9 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -720,10 +720,7 @@ public: auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc); request.MaxComputeActors = Config->_KqpMaxComputeActors.Get().GetRef(); - request.DisableLlvmForUdfStages = Config->DisableLlvmForUdfStages(); YQL_ENSURE(queryState); - bool enableLlvm = queryState->PreparedQuery->GetEnableLlvm().value_or(true); - request.LlvmEnabled = enableLlvm && IsSqlQuery(queryState->GetType()); request.Snapshot = queryState->TxCtx->GetSnapshot(); return request; @@ -1003,7 +1000,7 @@ public: auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(), - AsyncIoFactory); + AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 8be0b7c094..b9f32a7ca7 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1234,7 +1234,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { (`timestamp` <= CAST(1001000 AS Timestamp) AND `timestamp` >= CAST(1000999 AS Timestamp)) OR (`timestamp` > CAST(1002000 AS Timestamp)) - ORDER BY `timestamp` DESC; + ORDER BY `timestamp` DESC + LIMIT 1000; )"); auto rows = ExecuteScanQuery(tableClient, selectQuery); @@ -1693,7 +1694,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { { TString query = fmt::format(R"( --!syntax_v1 - PRAGMA ydb.EnableLlvm = "{}"; + PRAGMA ydb.UseLlvm = "{}"; SELECT COUNT(*) diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index d0fbc4077e..d3bc30a98b 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -225,7 +225,6 @@ message TPreparedQuery { repeated TParameterDescription Parameters = 4; optional string Text = 5; optional NKqpProto.TKqpPhyQuery PhysicalQuery = 6; - optional bool EnableLlvm = 7; }; message TQueryResponse { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 1aa269779c..b27fcbf160 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -201,37 +201,37 @@ message TKqpTransaction { optional TWriteOpMeta Writes = 3; // if not set -> no writes }; - // Scan-query task meta - message TScanTaskMeta { - // represents single scan instance over the specified table, with given limit and ordering - // can cover multiple datashards - message TReadOpMeta { - // shard id hint, may change (split, merge, kick, ...) - optional uint64 ShardId = 1; - // ordered list of non-intersecting ranges (not adjacent), with points (`To` is empty and `ToInclusive` is true) - repeated NKikimrTx.TKeyRange KeyRanges = 2; - } - - enum EReadType { - ROWS = 0; - BLOCKS = 1; - } - - optional TTableMeta Table = 1; - repeated TColumnMeta Columns = 2; - repeated uint32 KeyColumnTypes = 3; // for debug logs only - repeated NKikimrProto.TTypeInfo KeyColumnTypeInfos = 13; // for debug logs only - repeated bool SkipNullKeys = 4; - repeated TReadOpMeta Reads = 5; - optional uint64 ItemsLimit = 6; - optional bool Reverse = 7; - reserved 8; // optional bytes ProcessProgram = 8; - optional EScanDataFormat DataFormat = 9; - optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables - optional bool EnableShardsSequentialScan = 11; - repeated TColumnMeta ResultColumns = 12; - // Type of read result: unboxed values or Arrow blocks of data - optional EReadType ReadType = 14; + // Scan-query task meta + message TScanTaskMeta { + // represents single scan instance over the specified table, with given limit and ordering + // can cover multiple datashards + message TReadOpMeta { + // shard id hint, may change (split, merge, kick, ...) + optional uint64 ShardId = 1; + // ordered list of non-intersecting ranges (not adjacent), with points (`To` is empty and `ToInclusive` is true) + repeated NKikimrTx.TKeyRange KeyRanges = 2; + } + + enum EReadType { + ROWS = 0; + BLOCKS = 1; + } + + optional TTableMeta Table = 1; + repeated TColumnMeta Columns = 2; + repeated uint32 KeyColumnTypes = 3; // for debug logs only + repeated NKikimrProto.TTypeInfo KeyColumnTypeInfos = 13; // for debug logs only + repeated bool SkipNullKeys = 4; + repeated TReadOpMeta Reads = 5; + optional uint64 ItemsLimit = 6; + optional bool Reverse = 7; + reserved 8; // optional bytes ProcessProgram = 8; + optional EScanDataFormat DataFormat = 9; + optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables + optional bool EnableShardsSequentialScan = 11; + repeated TColumnMeta ResultColumns = 12; + // Type of read result: unboxed values or Arrow blocks of data + optional EReadType ReadType = 14; } optional EKqpTransactionType Type = 1; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 14ea6b3e98..40bce04634 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -226,7 +226,6 @@ struct TComputeRuntimeSettings { ui32 ExtraMemoryAllocationPool = 0; bool FailOnUndelivery = true; - bool UseLLVM = false; bool UseSpilling = false; std::function<void(bool success, const TIssues& issues)> TerminateHandler; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 8975fa85b8..d78e50d997 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1459,10 +1459,6 @@ protected: return RuntimeSettings.StatsMode; } - bool GetUseLLVM() const { - return RuntimeSettings.UseLLVM; - } - const TComputeMemoryLimits& GetMemoryLimits() const { return MemoryLimits; } diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto index e4d0e201ca..5c020cd52a 100644 --- a/ydb/library/yql/dq/actors/protos/dq_events.proto +++ b/ydb/library/yql/dq/actors/protos/dq_events.proto @@ -119,7 +119,7 @@ message TComputeRuntimeSettings { reserved 3; //optional NKqpProto.EKqpStatsMode LegacyStatsMode = 3; optional EDqStatsMode StatsMode = 8; - optional bool UseLLVM = 4; + optional bool UseLLVM_DeprecatedForCompatibility = 4[default = false]; optional bool UseSpilling = 6; optional uint32 TasksOnNodeCount = 5; // approx optional TRlPath RlPath = 7; diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index d0fb6acc11..d47176b2a4 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -230,6 +230,9 @@ message TDqStageStats { repeated TDqTableAggrStats Tables = 16; // is it required? repeated TDqComputeActorStats ComputeActors = 17; // more detailed stats + oneof LlvmOptions { + bool UseLlvm = 18; + } google.protobuf.Any Extra = 100; } diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index 1930f20d09..da6b4e5df0 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -44,6 +44,9 @@ message TProgram { bool HasLookup = 12; double InputDataPrediction = 13; double OutputDataPrediction = 14; + + bool HasCondense = 15; + uint32 NodesCount = 16; } uint32 RuntimeVersion = 1; @@ -178,4 +181,5 @@ message TDqTask { map<string, bytes> TaskParams = 13; map<string, string> SecureParams = 14; optional uint32 MetaId = 15; + optional bool UseLlvm = 16; } diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index dbddcb874d..ca5ddbe86b 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/library/accessor/accessor.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/dq/proto/dq_tasks.pb.h> #include <ydb/library/yql/ast/yql_expr.h> @@ -167,7 +168,8 @@ struct TTaskOutput { template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> struct TTask { private: - std::optional<ui32> MetaId; + YDB_OPT(ui32, MetaId); + YDB_ACCESSOR(bool, UseLlvm, false); public: using TInputType = TTaskInput<TInputMeta>; using TOutputType = TTaskOutput<TOutputMeta>; @@ -178,19 +180,6 @@ public: , Outputs(stageInfo.OutputsCount) { } - bool HasMetaId() const { - return !!MetaId; - } - - void SetMetaId(const ui32 value) { - MetaId = value; - } - - ui32 GetMetaIdUnsafe() const { - Y_VERIFY(MetaId); - return *MetaId; - } - ui64 Id = 0; TStageId StageId; TVector<TInputType> Inputs; |