diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-05 20:27:49 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-05 20:27:49 +0300 |
commit | 0d049ba5bab0ac7ce3959eb0dbfdc683ec76e288 (patch) | |
tree | 85201947eab79203adcef22802ca1ec6aca604bd | |
parent | 26bcd5285c79a277258d2e6ac804a51af3b1cbf8 (diff) | |
download | ydb-0d049ba5bab0ac7ce3959eb0dbfdc683ec76e288.tar.gz |
resource manager configure simultaneously shards count scanner
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.h | 15 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/node/kqp_node.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 |
5 files changed, 31 insertions, 12 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 702918be8a..4848240a6b 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -16,6 +16,19 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { +class TScanSettings { +private: + ui32 SimultaneousShardsCount = 8; +public: + TScanSettings& SetSimultaneousShardsCount(const ui32 value) { + SimultaneousShardsCount = value; + return *this; + } + ui32 GetSimultaneousShardsCount() const { + return SimultaneousShardsCount; + } +}; + IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, @@ -26,7 +39,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, - TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {}); + const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {}); namespace NComputeActor { diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 3caf55df4c..39633bb9f6 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -103,11 +103,12 @@ public: TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters, + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId)) , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) + , ScanSettings(scanSettings) , Counters(counters) { YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString()); @@ -677,7 +678,7 @@ private: bool StartTableScan() { // allow reading from multiple shards if data is not sorted - const ui32 maxAllowedInFlight = IsSortedOutput() ? 1 : PendingShards.size(); + const ui32 maxAllowedInFlight = IsSortedOutput() ? 1 : ScanSettings.GetSimultaneousShardsCount(); while (!PendingShards.empty() && InFlightShards.size() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) { ui64 tabletId = PendingShards.front().TabletId; @@ -1030,6 +1031,7 @@ private: private: NMiniKQL::TKqpScanComputeContext ComputeCtx; NKikimrKqp::TKqpSnapshot Snapshot; + TScanSettings ScanSettings; TIntrusivePtr<TKqpCounters> Counters; TScannedDataStats Stats; NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta; @@ -1052,11 +1054,11 @@ private: IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters, - NWilson::TTraceId traceId) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) { return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory), - functionRegistry, settings, memoryLimits, counters, std::move(traceId)); + functionRegistry, settings, memoryLimits, scanSettings, counters, std::move(traceId)); } } // namespace NKqp diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 5314ae802f..8785fafd8f 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -412,7 +412,7 @@ private: static ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo) { // TODO: take into account number of active scans on node const auto& stage = GetStage(stageInfo); - bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() || + const bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() || stage.GetProgram().GetSettings().GetHasMapJoin(); if (heavyProgram) { @@ -423,10 +423,10 @@ private: } TTask& AssignTaskToShard( - TStageInfo& stageInfo, ui64 shardId, + TStageInfo& stageInfo, const ui64 shardId, THashMap<ui64, std::vector<ui64>>& nodeTasks, THashMap<ui64, ui64>& assignedShardsCount, - bool sorted) + const bool sorted, const bool isOlapScan) { ui64 nodeId = ShardIdToNodeId.at(shardId); if (stageInfo.Meta.IsOlap() && sorted) { @@ -438,7 +438,7 @@ private: auto& tasks = nodeTasks[nodeId]; auto& cnt = assignedShardsCount[nodeId]; - const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo); + const ui32 maxScansPerNode = isOlapScan ? 1 : GetMaxTasksPerNodeEstimate(stageInfo); if (cnt < maxScansPerNode) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.NodeId = nodeId; @@ -470,6 +470,7 @@ private: THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, op, stageInfo, holderFactory, typeEnv); bool reverse = false; + const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange); ui64 itemsLimit = 0; bool sorted = true; TString itemsLimitParamName; @@ -496,7 +497,7 @@ private: for (auto& [shardId, shardInfo] : partitions) { YQL_ENSURE(!shardInfo.KeyWriteRanges); - auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted); + auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted, isOlapScan); for (auto& [name, value] : shardInfo.Params) { auto ret = task.Meta.Params.emplace(name, std::move(value)); diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp index 34c23cd0a4..45e0bb6413 100644 --- a/ydb/core/kqp/node/kqp_node.cpp +++ b/ydb/core/kqp/node/kqp_node.cpp @@ -286,6 +286,8 @@ private: runtimeSettingsBase.ReportStatsSettings = NYql::NDq::TReportStatsSettings{MinStatInterval, MaxStatInterval}; + TScanSettings scanSettings; + scanSettings.SetSimultaneousShardsCount(Config.GetSimultaneouslyShardsScanningCount()); auto actorSystem = TlsActivationContext->ActorSystem(); // start compute actors @@ -319,7 +321,7 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters, NWilson::TTraceId(ev->TraceId)); + CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanSettings, Counters, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 1f934bc08d..4aa53404db 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1017,6 +1017,7 @@ message TTableServiceConfig { optional uint64 MaxTotalScanBuffersSize = 13 [default = 4294967296]; // 4 GB optional uint64 MinScanBufferSize = 14 [default = 2097152]; // 2 MB + optional uint32 SimultaneouslyShardsScanningCount = 15 [default = 16]; } message TSpillingServiceConfig { |