aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-05 20:27:49 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-05 20:27:49 +0300
commit0d049ba5bab0ac7ce3959eb0dbfdc683ec76e288 (patch)
tree85201947eab79203adcef22802ca1ec6aca604bd
parent26bcd5285c79a277258d2e6ac804a51af3b1cbf8 (diff)
downloadydb-0d049ba5bab0ac7ce3959eb0dbfdc683ec76e288.tar.gz
resource manager configure simultaneously shards count scanner
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h15
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp12
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp11
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp4
-rw-r--r--ydb/core/protos/config.proto1
5 files changed, 31 insertions, 12 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 702918be8a..4848240a6b 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -16,6 +16,19 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
+class TScanSettings {
+private:
+ ui32 SimultaneousShardsCount = 8;
+public:
+ TScanSettings& SetSimultaneousShardsCount(const ui32 value) {
+ SimultaneousShardsCount = value;
+ return *this;
+ }
+ ui32 GetSimultaneousShardsCount() const {
+ return SimultaneousShardsCount;
+ }
+};
+
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
@@ -26,7 +39,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons
NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
- TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {});
+ const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {});
namespace NComputeActor {
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 3caf55df4c..39633bb9f6 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -103,11 +103,12 @@ public:
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters,
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters,
NWilson::TTraceId traceId)
: TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
+ , ScanSettings(scanSettings)
, Counters(counters)
{
YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString());
@@ -677,7 +678,7 @@ private:
bool StartTableScan() {
// allow reading from multiple shards if data is not sorted
- const ui32 maxAllowedInFlight = IsSortedOutput() ? 1 : PendingShards.size();
+ const ui32 maxAllowedInFlight = IsSortedOutput() ? 1 : ScanSettings.GetSimultaneousShardsCount();
while (!PendingShards.empty() && InFlightShards.size() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) {
ui64 tabletId = PendingShards.front().TabletId;
@@ -1030,6 +1031,7 @@ private:
private:
NMiniKQL::TKqpScanComputeContext ComputeCtx;
NKikimrKqp::TKqpSnapshot Snapshot;
+ TScanSettings ScanSettings;
TIntrusivePtr<TKqpCounters> Counters;
TScannedDataStats Stats;
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
@@ -1052,11 +1054,11 @@ private:
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters,
- NWilson::TTraceId traceId)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
{
return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits, counters, std::move(traceId));
+ functionRegistry, settings, memoryLimits, scanSettings, counters, std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 5314ae802f..8785fafd8f 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -412,7 +412,7 @@ private:
static ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo) {
// TODO: take into account number of active scans on node
const auto& stage = GetStage(stageInfo);
- bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() ||
+ const bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() ||
stage.GetProgram().GetSettings().GetHasMapJoin();
if (heavyProgram) {
@@ -423,10 +423,10 @@ private:
}
TTask& AssignTaskToShard(
- TStageInfo& stageInfo, ui64 shardId,
+ TStageInfo& stageInfo, const ui64 shardId,
THashMap<ui64, std::vector<ui64>>& nodeTasks,
THashMap<ui64, ui64>& assignedShardsCount,
- bool sorted)
+ const bool sorted, const bool isOlapScan)
{
ui64 nodeId = ShardIdToNodeId.at(shardId);
if (stageInfo.Meta.IsOlap() && sorted) {
@@ -438,7 +438,7 @@ private:
auto& tasks = nodeTasks[nodeId];
auto& cnt = assignedShardsCount[nodeId];
- const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo);
+ const ui32 maxScansPerNode = isOlapScan ? 1 : GetMaxTasksPerNodeEstimate(stageInfo);
if (cnt < maxScansPerNode) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.NodeId = nodeId;
@@ -470,6 +470,7 @@ private:
THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, op, stageInfo, holderFactory, typeEnv);
bool reverse = false;
+ const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange);
ui64 itemsLimit = 0;
bool sorted = true;
TString itemsLimitParamName;
@@ -496,7 +497,7 @@ private:
for (auto& [shardId, shardInfo] : partitions) {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
- auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted);
+ auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted, isOlapScan);
for (auto& [name, value] : shardInfo.Params) {
auto ret = task.Meta.Params.emplace(name, std::move(value));
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index 34c23cd0a4..45e0bb6413 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -286,6 +286,8 @@ private:
runtimeSettingsBase.ReportStatsSettings = NYql::NDq::TReportStatsSettings{MinStatInterval, MaxStatInterval};
+ TScanSettings scanSettings;
+ scanSettings.SetSimultaneousShardsCount(Config.GetSimultaneouslyShardsScanningCount());
auto actorSystem = TlsActivationContext->ActorSystem();
// start compute actors
@@ -319,7 +321,7 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters, NWilson::TTraceId(ev->TraceId));
+ CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanSettings, Counters, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 1f934bc08d..4aa53404db 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1017,6 +1017,7 @@ message TTableServiceConfig {
optional uint64 MaxTotalScanBuffersSize = 13 [default = 4294967296]; // 4 GB
optional uint64 MinScanBufferSize = 14 [default = 2097152]; // 2 MB
+ optional uint32 SimultaneouslyShardsScanningCount = 15 [default = 16];
}
message TSpillingServiceConfig {