aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-06 17:35:56 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-06 17:35:56 +0300
commit4e10d96b22c65771d1a1319595630c13fe7085ef (patch)
tree797f22999b6698666d8b85ea67bccfb630fa8c85
parentbc5eb0177b4e454aec6b55e2fe68cf02de57cdbe (diff)
downloadydb-4e10d96b22c65771d1a1319595630c13fe7085ef.tar.gz
simultaneous limits for different request cases
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h18
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp54
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp5
-rw-r--r--ydb/core/protos/config.proto9
4 files changed, 58 insertions, 28 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 4848240a6b5..cec92b2440b 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -16,17 +16,17 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
-class TScanSettings {
+class TShardsScanningPolicy {
private:
- ui32 SimultaneousShardsCount = 8;
+ const NKikimrConfig::TTableServiceConfig::TShardsScanningPolicy ProtoConfig;
public:
- TScanSettings& SetSimultaneousShardsCount(const ui32 value) {
- SimultaneousShardsCount = value;
- return *this;
- }
- ui32 GetSimultaneousShardsCount() const {
- return SimultaneousShardsCount;
+ TShardsScanningPolicy(const NKikimrConfig::TTableServiceConfig::TShardsScanningPolicy& pbConfig)
+ : ProtoConfig(pbConfig)
+ {
+
}
+
+ ui32 GetMaxInFlightScans(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) const;
};
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
@@ -39,7 +39,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons
NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
- const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {});
+ const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {});
namespace NComputeActor {
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 39633bb9f6b..c1e35876912 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -103,12 +103,13 @@ public:
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters,
- NWilson::TTraceId traceId)
- : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings,
+ memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
- , ScanSettings(scanSettings)
+ , ShardsScanningPolicy(shardsScanningPolicy)
, Counters(counters)
{
YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString());
@@ -139,7 +140,7 @@ public:
NDq::TLogFunc logger;
if (IsDebugLogEnabled(actorSystem, NKikimrServices::KQP_TASKS_RUNNER)) {
- logger = [actorSystem, txId = GetTxId(), taskId = GetTask().GetId()] (const TString& message) {
+ logger = [actorSystem, txId = GetTxId(), taskId = GetTask().GetId()](const TString& message) {
LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_TASKS_RUNNER, "TxId: " << txId
<< ", task: " << taskId << ": " << message);
};
@@ -148,9 +149,9 @@ public:
auto taskRunner = CreateKqpTaskRunner(execCtx, settings, logger);
SetTaskRunner(taskRunner);
- auto wakeup = [this]{ ContinueExecute(); };
+ auto wakeup = [this] { ContinueExecute(); };
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup),
- TlsActivationContext->AsActorContext()));
+ TlsActivationContext->AsActorContext()));
ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
ScanData = &ComputeCtx.GetTableScan(0);
@@ -672,13 +673,9 @@ private:
private:
- bool IsSortedOutput() const {
- return Meta.HasSorted() ? Meta.GetSorted() : true;
- }
-
bool StartTableScan() {
// allow reading from multiple shards if data is not sorted
- const ui32 maxAllowedInFlight = IsSortedOutput() ? 1 : ScanSettings.GetSimultaneousShardsCount();
+ const ui32 maxAllowedInFlight = ShardsScanningPolicy.GetMaxInFlightScans(Meta);
while (!PendingShards.empty() && InFlightShards.size() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) {
ui64 tabletId = PendingShards.front().TabletId;
@@ -1031,7 +1028,7 @@ private:
private:
NMiniKQL::TKqpScanComputeContext ComputeCtx;
NKikimrKqp::TKqpSnapshot Snapshot;
- TScanSettings ScanSettings;
+ TShardsScanningPolicy ShardsScanningPolicy;
TIntrusivePtr<TKqpCounters> Counters;
TScannedDataStats Stats;
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
@@ -1051,14 +1048,41 @@ private:
} // anonymous namespace
+ui32 TShardsScanningPolicy::GetMaxInFlightScans(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) const {
+ const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true);
+ if (meta.HasOlapProgram()) {
+ NKikimrSSA::TProgram program;
+ Y_VERIFY(program.ParseFromString(meta.GetOlapProgram().GetProgram()));
+ std::optional<ui32> aggregationLimit;
+ for (auto&& command : program.GetCommand()) {
+ if (!command.HasGroupBy()) {
+ aggregationLimit = Min(aggregationLimit.value_or(ProtoConfig.GetScanLimit()), ProtoConfig.GetScanLimit());
+ continue;
+ }
+ ui32 aggrLimit;
+ if (command.GetGroupBy().GetKeyColumns().size()) {
+ aggrLimit = ProtoConfig.GetAggregationGroupByLimit();
+ } else {
+ aggrLimit = ProtoConfig.GetAggregationNoGroupLimit();
+ }
+ aggregationLimit = Min(aggregationLimit.value_or(aggrLimit), aggrLimit);
+ }
+ return aggregationLimit.value_or(ProtoConfig.GetAggregationGroupByLimit());
+ } else if (isSorted) {
+ return 1;
+ } else {
+ return ProtoConfig.GetScanLimit();
+ }
+}
+
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const TScanSettings& scanSettings, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
+ const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
{
return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits, scanSettings, counters, std::move(traceId));
+ functionRegistry, settings, memoryLimits, shardsScanningPolicy, counters, std::move(traceId));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index 45e0bb64133..c125129d319 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -286,8 +286,7 @@ private:
runtimeSettingsBase.ReportStatsSettings = NYql::NDq::TReportStatsSettings{MinStatInterval, MaxStatInterval};
- TScanSettings scanSettings;
- scanSettings.SetSimultaneousShardsCount(Config.GetSimultaneouslyShardsScanningCount());
+ TShardsScanningPolicy scanPolicy(Config.GetShardsScanningPolicy());
auto actorSystem = TlsActivationContext->ActorSystem();
// start compute actors
@@ -321,7 +320,7 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanSettings, Counters, NWilson::TTraceId(ev->TraceId));
+ CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 4aa53404dbe..e3cbf2005d3 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1000,6 +1000,13 @@ message TTableProfilesConfig {
}
message TTableServiceConfig {
+
+ message TShardsScanningPolicy {
+ optional uint32 AggregationGroupByLimit = 1 [default = 256];
+ optional uint32 AggregationNoGroupLimit = 2 [default = 1024];
+ optional uint32 ScanLimit = 3 [default = 16];
+ }
+
message TResourceManager {
optional uint32 ComputeActorsCount = 1 [default = 1000];
optional uint64 ChannelBufferSize = 2 [default = 4194304]; // 4 MB
@@ -1017,7 +1024,7 @@ message TTableServiceConfig {
optional uint64 MaxTotalScanBuffersSize = 13 [default = 4294967296]; // 4 GB
optional uint64 MinScanBufferSize = 14 [default = 2097152]; // 2 MB
- optional uint32 SimultaneouslyShardsScanningCount = 15 [default = 16];
+ optional TShardsScanningPolicy ShardsScanningPolicy = 16;
}
message TSpillingServiceConfig {