diff options
author | udovichenko-r <rvu@ydb.tech> | 2023-11-17 13:56:30 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2023-11-17 14:25:39 +0300 |
commit | 630bbbdf9bd411b26898417221ad81109bb1f1fc (patch) | |
tree | 145ee23b3c83e6bcde3552e0bca0f1d47d58bd6a | |
parent | a447fa34889ed5c9d047e607547ab69cedfeb6a1 (diff) | |
download | ydb-630bbbdf9bd411b26898417221ad81109bb1f1fc.tar.gz |
[dq] Some fixes for async CA with spilling
YQL-17136
12 files changed, 16 insertions, 20 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index db7f86c85d..bc5bb46479 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -14,8 +14,8 @@ using namespace NYql::NDq; class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext { public: - TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const TActorContext& ctx) - : TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp), ctx) + TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp) + : TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp)) { } diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index c9f2ffbee8..748d1a30cd 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -74,7 +74,7 @@ void TKqpComputeActor::DoBootstrap() { auto wakeup = [this]{ ContinueExecute(); }; try { PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, - std::move(wakeup), TlsActivationContext->AsActorContext())); + std::move(wakeup))); } catch (const NMiniKQL::TKqpEnsureFail& e) { InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); return; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 02159ec758..dc046412fd 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -222,8 +222,7 @@ void TKqpScanComputeActor::DoBootstrap() { TBase::SetTaskRunner(taskRunner); auto wakeup = [this] { ContinueExecute(); }; - TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), - TlsActivationContext->AsActorContext())); + TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup))); ComputeCtx.AddTableScan(0, Meta, GetStatsMode()); ScanData = &ComputeCtx.GetTableScan(0); diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index ced976f1fa..07e2c5606a 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -85,7 +85,7 @@ public: auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>( - TxId, RuntimeSettings.UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext()); + TxId, RuntimeSettings.UseSpilling, std::move(wakeup)); Send(TaskRunnerActorId, new NTaskRunnerActor::TEvTaskRunnerCreate( diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 11ab98ba73..1811bb4b17 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -57,7 +57,7 @@ public: auto taskRunner = TaskRunnerFactory(Task, RuntimeSettings.StatsMode, logger); SetTaskRunner(taskRunner); auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; - TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext()); + TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup)); PrepareTaskRunner(execCtx); ContinueExecute(EResumeSource::CABootstrap); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 824302edde..d61cc0f54d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -629,7 +629,6 @@ protected: TaskRunner->GetAllocatorPtr()->InvalidateMemInfo(); TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck(); } - std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator(); State = NDqProto::COMPUTE_STATE_FAILURE; ReportStateAndMaybeDie(statusCode, issues); } diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp index 08265d3f3e..757d35ebb5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp @@ -6,17 +6,16 @@ namespace NYql { namespace NDq { -TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const NActors::TActorContext& ctx) +TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp) : TxId_(txId) , WakeUp_(std::move(wakeUp)) - , Ctx_(ctx) , WithSpilling_(withSpilling) { } IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const { if (WithSpilling_) { - return CreateDqChannelStorage(TxId_, channelId, WakeUp_, Ctx_); + return CreateDqChannelStorage(TxId_, channelId, WakeUp_); } else { return nullptr; } diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h index e364dda0d2..1ef8ad8664 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h @@ -10,14 +10,13 @@ namespace NDq { class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase { public: - TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const NActors::TActorContext& ctx); + TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp); IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override; private: const TTxId TxId_; const IDqChannelStorage::TWakeUpCallback WakeUp_; - const NActors::TActorContext& Ctx_; const bool WithSpilling_; }; diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp index 4f5a55a23e..837eb2bb0e 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp @@ -202,9 +202,9 @@ private: class TDqChannelStorage : public IDqChannelStorage { public: - TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, const TActorContext& ctx) { + TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp) { SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp)); - ctx.RegisterWithSameMailbox(SelfActor_); + TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_); } ~TDqChannelStorage() { @@ -233,10 +233,9 @@ private: } // anonymous namespace -IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, - const TActorContext& ctx) +IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp) { - return new TDqChannelStorage(txId, channelId, std::move(wakeUp), ctx); + return new TDqChannelStorage(txId, channelId, std::move(wakeUp)); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h index f0d6905f02..c54e9ad3cf 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.h +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h @@ -7,6 +7,6 @@ namespace NYql::NDq { NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, - NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, const NActors::TActorContext& ctx); + NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index b8ade00c4f..e7c3b4372d 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -121,6 +121,7 @@ private: if (MemoryQuota) { MemoryQuota->TryReleaseQuota(); } + TaskRunner.Reset(); TActor<TLocalTaskRunnerActor>::PassAway(); } diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 5b1a7c0bf4..2131ed5713 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -263,7 +263,7 @@ private: auto wakeup = [this]{ ResumeExecution(EResumeSource::Default); }; std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>( - TraceId, UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext()); + TraceId, UseSpilling, std::move(wakeup)); Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits, NDqProto::DQ_STATS_MODE_BASIC, execCtx)); } |