diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-06 17:35:56 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-06 17:35:56 +0300 |
commit | 4e10d96b22c65771d1a1319595630c13fe7085ef (patch) | |
tree | 797f22999b6698666d8b85ea67bccfb630fa8c85 | |
parent | bc5eb0177b4e454aec6b55e2fe68cf02de57cdbe (diff) | |
download | ydb-4e10d96b22c65771d1a1319595630c13fe7085ef.tar.gz |
simultaneous limits for different request cases
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.h | 18 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 54 | ||||
-rw-r--r-- | ydb/core/kqp/node/kqp_node.cpp | 5 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 9 |
4 files changed, 58 insertions, 28 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 4848240a6b5..cec92b2440b 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -16,17 +16,17 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { -class TScanSettings { +class TShardsScanningPolicy { private: - ui32 SimultaneousShardsCount = 8; + const NKikimrConfig::TTableServiceConfig::TShardsScanningPolicy ProtoConfig; public: - TScanSettings& SetSimultaneousShardsCount(const ui32 value) { - SimultaneousShardsCount = value; - return *this; - } - ui32 GetSimultaneousShardsCount() const { - return SimultaneousShardsCount; + TShardsScanningPolicy(const NKikimrConfig::TTableServiceConfig::TShardsScanningPolicy& pbConfig) + : ProtoConfig(pbConfig) + { + } + + ui32 GetMaxInFlightScans(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) const; }; IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, @@ -39,7 +39,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, - const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {}); + const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {}); namespace NComputeActor { diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 39633bb9f6b..c1e35876912 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -103,12 +103,13 @@ public: TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, - NWilson::TTraceId traceId) - : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId)) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, + memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId)) , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) - , ScanSettings(scanSettings) + , ShardsScanningPolicy(shardsScanningPolicy) , Counters(counters) { YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString()); @@ -139,7 +140,7 @@ public: NDq::TLogFunc logger; if (IsDebugLogEnabled(actorSystem, NKikimrServices::KQP_TASKS_RUNNER)) { - logger = [actorSystem, txId = GetTxId(), taskId = GetTask().GetId()] (const TString& message) { + logger = [actorSystem, txId = GetTxId(), taskId = GetTask().GetId()](const TString& message) { LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_TASKS_RUNNER, "TxId: " << txId << ", task: " << taskId << ": " << message); }; @@ -148,9 +149,9 @@ public: auto taskRunner = CreateKqpTaskRunner(execCtx, settings, logger); SetTaskRunner(taskRunner); - auto wakeup = [this]{ ContinueExecute(); }; + auto wakeup = [this] { ContinueExecute(); }; PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), - TlsActivationContext->AsActorContext())); + TlsActivationContext->AsActorContext())); ComputeCtx.AddTableScan(0, Meta, GetStatsMode()); ScanData = &ComputeCtx.GetTableScan(0); @@ -672,13 +673,9 @@ private: private: - bool IsSortedOutput() const { - return Meta.HasSorted() ? Meta.GetSorted() : true; - } - bool StartTableScan() { // allow reading from multiple shards if data is not sorted - const ui32 maxAllowedInFlight = IsSortedOutput() ? 1 : ScanSettings.GetSimultaneousShardsCount(); + const ui32 maxAllowedInFlight = ShardsScanningPolicy.GetMaxInFlightScans(Meta); while (!PendingShards.empty() && InFlightShards.size() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) { ui64 tabletId = PendingShards.front().TabletId; @@ -1031,7 +1028,7 @@ private: private: NMiniKQL::TKqpScanComputeContext ComputeCtx; NKikimrKqp::TKqpSnapshot Snapshot; - TScanSettings ScanSettings; + TShardsScanningPolicy ShardsScanningPolicy; TIntrusivePtr<TKqpCounters> Counters; TScannedDataStats Stats; NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta; @@ -1051,14 +1048,41 @@ private: } // anonymous namespace +ui32 TShardsScanningPolicy::GetMaxInFlightScans(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) const { + const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true); + if (meta.HasOlapProgram()) { + NKikimrSSA::TProgram program; + Y_VERIFY(program.ParseFromString(meta.GetOlapProgram().GetProgram())); + std::optional<ui32> aggregationLimit; + for (auto&& command : program.GetCommand()) { + if (!command.HasGroupBy()) { + aggregationLimit = Min(aggregationLimit.value_or(ProtoConfig.GetScanLimit()), ProtoConfig.GetScanLimit()); + continue; + } + ui32 aggrLimit; + if (command.GetGroupBy().GetKeyColumns().size()) { + aggrLimit = ProtoConfig.GetAggregationGroupByLimit(); + } else { + aggrLimit = ProtoConfig.GetAggregationNoGroupLimit(); + } + aggregationLimit = Min(aggregationLimit.value_or(aggrLimit), aggrLimit); + } + return aggregationLimit.value_or(ProtoConfig.GetAggregationGroupByLimit()); + } else if (isSorted) { + return 1; + } else { + return ProtoConfig.GetScanLimit(); + } +} + IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) + const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) { return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory), - functionRegistry, settings, memoryLimits, scanSettings, counters, std::move(traceId)); + functionRegistry, settings, memoryLimits, shardsScanningPolicy, counters, std::move(traceId)); } } // namespace NKqp diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp index 45e0bb64133..c125129d319 100644 --- a/ydb/core/kqp/node/kqp_node.cpp +++ b/ydb/core/kqp/node/kqp_node.cpp @@ -286,8 +286,7 @@ private: runtimeSettingsBase.ReportStatsSettings = NYql::NDq::TReportStatsSettings{MinStatInterval, MaxStatInterval}; - TScanSettings scanSettings; - scanSettings.SetSimultaneousShardsCount(Config.GetSimultaneouslyShardsScanningCount()); + TShardsScanningPolicy scanPolicy(Config.GetShardsScanningPolicy()); auto actorSystem = TlsActivationContext->ActorSystem(); // start compute actors @@ -321,7 +320,7 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanSettings, Counters, NWilson::TTraceId(ev->TraceId)); + CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 4aa53404dbe..e3cbf2005d3 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1000,6 +1000,13 @@ message TTableProfilesConfig { } message TTableServiceConfig { + + message TShardsScanningPolicy { + optional uint32 AggregationGroupByLimit = 1 [default = 256]; + optional uint32 AggregationNoGroupLimit = 2 [default = 1024]; + optional uint32 ScanLimit = 3 [default = 16]; + } + message TResourceManager { optional uint32 ComputeActorsCount = 1 [default = 1000]; optional uint64 ChannelBufferSize = 2 [default = 4194304]; // 4 MB @@ -1017,7 +1024,7 @@ message TTableServiceConfig { optional uint64 MaxTotalScanBuffersSize = 13 [default = 4294967296]; // 4 GB optional uint64 MinScanBufferSize = 14 [default = 2097152]; // 2 MB - optional uint32 SimultaneouslyShardsScanningCount = 15 [default = 16]; + optional TShardsScanningPolicy ShardsScanningPolicy = 16; } message TSpillingServiceConfig { |