diff options
| author | va-kuznecov <[email protected]> | 2022-10-31 09:23:05 +0300 |
|---|---|---|
| committer | va-kuznecov <[email protected]> | 2022-10-31 09:23:05 +0300 |
| commit | 88b425c10423300e71eb3edd18094b4973478571 (patch) | |
| tree | 522c263cfcd9d70ec3084bc97010330f47538ca9 | |
| parent | 98896faa82d9e07cee4a6356eb920c5da6196f3b (diff) | |
Fix memory limits in pure and scan requests
| -rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 44 |
1 files changed, 18 insertions, 26 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 70357b9320b..3e8c7122791 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -767,9 +767,9 @@ public: Cleanup(IsFatalError(record.GetYdbStatus())); } - IKqpGateway::TExecPhysicalRequest PreparePureRequest(TKqpQueryState *queryState) { + IKqpGateway::TExecPhysicalRequest PrepareRequest(TKqpQueryState *queryState) { IKqpGateway::TExecPhysicalRequest request; - request.NeedTxId = false; + if (queryState) { auto now = TAppData::TimeProvider->Now(); request.Timeout = queryState->QueryDeadlines.TimeoutAt - now; @@ -780,50 +780,42 @@ public: EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request); request.StatsMode = GetStatsMode(statsMode); } + + const auto& limits = GetQueryLimits(Settings); + request.MaxAffectedShards = limits.PhaseLimits.AffectedShardsLimit; + request.TotalReadSizeLimitBytes = limits.PhaseLimits.TotalReadSizeLimitBytes; + request.MkqlMemoryLimit = limits.PhaseLimits.ComputeNodeMemoryLimitBytes; return request; } - IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState) { - IKqpGateway::TExecPhysicalRequest request; - if (queryState) { - auto now = TAppData::TimeProvider->Now(); - request.Timeout = queryState->QueryDeadlines.TimeoutAt - now; - if (auto cancelAt = queryState->QueryDeadlines.CancelAt) { - request.CancelAfter = cancelAt - now; - } + IKqpGateway::TExecPhysicalRequest PreparePureRequest(TKqpQueryState *queryState) { + auto request = PrepareRequest(queryState); + request.NeedTxId = false; + return request; + } - EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request); - request.StatsMode = GetStatsMode(statsMode); + IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState) { + auto request = PrepareRequest(queryState); + if (queryState) { request.Snapshot = queryState->TxCtx->GetSnapshot(); request.IsolationLevel = *queryState->TxCtx->EffectiveIsolationLevel; } else { request.IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; } - const auto& limits = GetQueryLimits(Settings); - request.MaxAffectedShards = limits.PhaseLimits.AffectedShardsLimit; - request.TotalReadSizeLimitBytes = limits.PhaseLimits.TotalReadSizeLimitBytes; - request.MkqlMemoryLimit = limits.PhaseLimits.ComputeNodeMemoryLimitBytes; - return request; } IKqpGateway::TExecPhysicalRequest PrepareScanRequest(TKqpQueryState *queryState) { - IKqpGateway::TExecPhysicalRequest request; + auto request = PrepareRequest(queryState); - request.Timeout = queryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now(); - if (!request.Timeout) { - // TODO: Just cancel request. - request.Timeout = TDuration::MilliSeconds(1); - } request.MaxComputeActors = Config->_KqpMaxComputeActors.Get().GetRef(); - EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request); - request.StatsMode = GetStatsMode(statsMode); request.DisableLlvmForUdfStages = Config->DisableLlvmForUdfStages(); request.LlvmEnabled = Config->GetEnableLlvm() != EOptionalFlag::Disabled; - request.Snapshot = QueryState->TxCtx->GetSnapshot(); + YQL_ENSURE(queryState); + request.Snapshot = queryState->TxCtx->GetSnapshot(); return request; } |
