summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <[email protected]>2022-10-31 09:23:05 +0300
committerva-kuznecov <[email protected]>2022-10-31 09:23:05 +0300
commit88b425c10423300e71eb3edd18094b4973478571 (patch)
tree522c263cfcd9d70ec3084bc97010330f47538ca9
parent98896faa82d9e07cee4a6356eb920c5da6196f3b (diff)
Fix memory limits in pure and scan requests
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp44
1 files changed, 18 insertions, 26 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 70357b9320b..3e8c7122791 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -767,9 +767,9 @@ public:
Cleanup(IsFatalError(record.GetYdbStatus()));
}
- IKqpGateway::TExecPhysicalRequest PreparePureRequest(TKqpQueryState *queryState) {
+ IKqpGateway::TExecPhysicalRequest PrepareRequest(TKqpQueryState *queryState) {
IKqpGateway::TExecPhysicalRequest request;
- request.NeedTxId = false;
+
if (queryState) {
auto now = TAppData::TimeProvider->Now();
request.Timeout = queryState->QueryDeadlines.TimeoutAt - now;
@@ -780,50 +780,42 @@ public:
EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request);
request.StatsMode = GetStatsMode(statsMode);
}
+
+ const auto& limits = GetQueryLimits(Settings);
+ request.MaxAffectedShards = limits.PhaseLimits.AffectedShardsLimit;
+ request.TotalReadSizeLimitBytes = limits.PhaseLimits.TotalReadSizeLimitBytes;
+ request.MkqlMemoryLimit = limits.PhaseLimits.ComputeNodeMemoryLimitBytes;
return request;
}
- IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState) {
- IKqpGateway::TExecPhysicalRequest request;
- if (queryState) {
- auto now = TAppData::TimeProvider->Now();
- request.Timeout = queryState->QueryDeadlines.TimeoutAt - now;
- if (auto cancelAt = queryState->QueryDeadlines.CancelAt) {
- request.CancelAfter = cancelAt - now;
- }
+ IKqpGateway::TExecPhysicalRequest PreparePureRequest(TKqpQueryState *queryState) {
+ auto request = PrepareRequest(queryState);
+ request.NeedTxId = false;
+ return request;
+ }
- EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request);
- request.StatsMode = GetStatsMode(statsMode);
+ IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState) {
+ auto request = PrepareRequest(queryState);
+ if (queryState) {
request.Snapshot = queryState->TxCtx->GetSnapshot();
request.IsolationLevel = *queryState->TxCtx->EffectiveIsolationLevel;
} else {
request.IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
}
- const auto& limits = GetQueryLimits(Settings);
- request.MaxAffectedShards = limits.PhaseLimits.AffectedShardsLimit;
- request.TotalReadSizeLimitBytes = limits.PhaseLimits.TotalReadSizeLimitBytes;
- request.MkqlMemoryLimit = limits.PhaseLimits.ComputeNodeMemoryLimitBytes;
-
return request;
}
IKqpGateway::TExecPhysicalRequest PrepareScanRequest(TKqpQueryState *queryState) {
- IKqpGateway::TExecPhysicalRequest request;
+ auto request = PrepareRequest(queryState);
- request.Timeout = queryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now();
- if (!request.Timeout) {
- // TODO: Just cancel request.
- request.Timeout = TDuration::MilliSeconds(1);
- }
request.MaxComputeActors = Config->_KqpMaxComputeActors.Get().GetRef();
- EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request);
- request.StatsMode = GetStatsMode(statsMode);
request.DisableLlvmForUdfStages = Config->DisableLlvmForUdfStages();
request.LlvmEnabled = Config->GetEnableLlvm() != EOptionalFlag::Disabled;
- request.Snapshot = QueryState->TxCtx->GetSnapshot();
+ YQL_ENSURE(queryState);
+ request.Snapshot = queryState->TxCtx->GetSnapshot();
return request;
}