diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2024-01-31 10:48:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 10:48:27 +0300 |
commit | d224e9843d2bd10b29b0eb2067b1bc13c2a6040c (patch) | |
tree | 24dce63f8e6ad162cf206c3b25878684338621c0 | |
parent | 827a532736dc6a7c9f435060d451b5fba6d577d2 (diff) | |
download | ydb-d224e9843d2bd10b29b0eb2067b1bc13c2a6040c.tar.gz |
YQL-17542 move PrepareTaskRunner to TDqSyncComputeActorBase (#1416)
3 files changed, 34 insertions, 34 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index c06bfb99d3..94aaa3811e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1461,32 +1461,6 @@ public: } protected: - void SetTaskRunner(const TIntrusivePtr<IDqTaskRunner>& taskRunner) { - TaskRunner = taskRunner; - } - - void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) { - YQL_ENSURE(TaskRunner); - - auto guard = TaskRunner->BindAllocator(MemoryQuota->GetMkqlMemoryLimit()); - auto* alloc = guard.GetMutex(); - - MemoryQuota->TrySetIncreaseMemoryLimitCallback(alloc); - - TDqTaskRunnerMemoryLimits limits; - limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize; - limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; - - TaskRunner->Prepare(Task, limits, execCtx); - - FillIoMaps( - TaskRunner->GetHolderFactory(), - TaskRunner->GetTypeEnv(), - TaskRunner->GetSecureParams(), - TaskRunner->GetTaskParams(), - TaskRunner->GetReadRanges()); - } - void FillIoMaps( const NKikimr::NMiniKQL::THolderFactory& holderFactory, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h index aed3dc5240..c70216b9e4 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h @@ -6,19 +6,19 @@ #endif #define CA_LOG_T(s) \ - LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_D(s) \ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_I(s) \ - LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_W(s) \ - LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_N(s) \ - LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_E(s) \ - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_C(s) \ - LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG(prio, s) \ - LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 0d10b8e7c8..8d2ffb0092 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -43,6 +43,32 @@ public: { return inputTransformInfo.Buffer.Get(); } +protected: + void SetTaskRunner(const TIntrusivePtr<IDqTaskRunner>& taskRunner) { + this->TaskRunner = taskRunner; + } + + void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) { + YQL_ENSURE(this->TaskRunner); + + auto guard = this->TaskRunner->BindAllocator(this->MemoryQuota->GetMkqlMemoryLimit()); + auto* alloc = guard.GetMutex(); + + this->MemoryQuota->TrySetIncreaseMemoryLimitCallback(alloc); + + TDqTaskRunnerMemoryLimits limits; + limits.ChannelBufferSize = this->MemoryLimits.ChannelBufferSize; + limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; + + this->TaskRunner->Prepare(this->Task, limits, execCtx); + + TBase::FillIoMaps( + this->TaskRunner->GetHolderFactory(), + this->TaskRunner->GetTypeEnv(), + this->TaskRunner->GetSecureParams(), + this->TaskRunner->GetTaskParams(), + this->TaskRunner->GetReadRanges()); + } }; } //namespace NYql::NDq |