aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2023-11-17 13:56:30 +0300
committerudovichenko-r <rvu@ydb.tech>2023-11-17 14:25:39 +0300
commit630bbbdf9bd411b26898417221ad81109bb1f1fc (patch)
tree145ee23b3c83e6bcde3552e0bca0f1d47d58bd6a
parenta447fa34889ed5c9d047e607547ab69cedfeb6a1 (diff)
downloadydb-630bbbdf9bd411b26898417221ad81109bb1f1fc.tar.gz
[dq] Some fixes for async CA with spilling
YQL-17136
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h3
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage.cpp9
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage.h2
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp1
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp2
12 files changed, 16 insertions, 20 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
index db7f86c85d..bc5bb46479 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
@@ -14,8 +14,8 @@ using namespace NYql::NDq;
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
- TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const TActorContext& ctx)
- : TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp), ctx)
+ TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
+ : TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp))
{
}
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index c9f2ffbee8..748d1a30cd 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -74,7 +74,7 @@ void TKqpComputeActor::DoBootstrap() {
auto wakeup = [this]{ ContinueExecute(); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling,
- std::move(wakeup), TlsActivationContext->AsActorContext()));
+ std::move(wakeup)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 02159ec758..dc046412fd 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -222,8 +222,7 @@ void TKqpScanComputeActor::DoBootstrap() {
TBase::SetTaskRunner(taskRunner);
auto wakeup = [this] { ContinueExecute(); };
- TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup),
- TlsActivationContext->AsActorContext()));
+ TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
ScanData = &ComputeCtx.GetTableScan(0);
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index ced976f1fa..07e2c5606a 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -85,7 +85,7 @@ public:
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(
- TxId, RuntimeSettings.UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext());
+ TxId, RuntimeSettings.UseSpilling, std::move(wakeup));
Send(TaskRunnerActorId,
new NTaskRunnerActor::TEvTaskRunnerCreate(
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 11ab98ba73..1811bb4b17 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -57,7 +57,7 @@ public:
auto taskRunner = TaskRunnerFactory(Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
- TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext());
+ TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup));
PrepareTaskRunner(execCtx);
ContinueExecute(EResumeSource::CABootstrap);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 824302edde..d61cc0f54d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -629,7 +629,6 @@ protected:
TaskRunner->GetAllocatorPtr()->InvalidateMemInfo();
TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck();
}
- std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator();
State = NDqProto::COMPUTE_STATE_FAILURE;
ReportStateAndMaybeDie(statusCode, issues);
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
index 08265d3f3e..757d35ebb5 100644
--- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
@@ -6,17 +6,16 @@
namespace NYql {
namespace NDq {
-TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const NActors::TActorContext& ctx)
+TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TxId_(txId)
, WakeUp_(std::move(wakeUp))
- , Ctx_(ctx)
, WithSpilling_(withSpilling)
{
}
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
if (WithSpilling_) {
- return CreateDqChannelStorage(TxId_, channelId, WakeUp_, Ctx_);
+ return CreateDqChannelStorage(TxId_, channelId, WakeUp_);
} else {
return nullptr;
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
index e364dda0d2..1ef8ad8664 100644
--- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
+++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
@@ -10,14 +10,13 @@ namespace NDq {
class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
public:
- TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const NActors::TActorContext& ctx);
+ TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp);
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
private:
const TTxId TxId_;
const IDqChannelStorage::TWakeUpCallback WakeUp_;
- const NActors::TActorContext& Ctx_;
const bool WithSpilling_;
};
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
index 4f5a55a23e..837eb2bb0e 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
@@ -202,9 +202,9 @@ private:
class TDqChannelStorage : public IDqChannelStorage {
public:
- TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, const TActorContext& ctx) {
+ TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp) {
SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp));
- ctx.RegisterWithSameMailbox(SelfActor_);
+ TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_);
}
~TDqChannelStorage() {
@@ -233,10 +233,9 @@ private:
} // anonymous namespace
-IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp,
- const TActorContext& ctx)
+IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp)
{
- return new TDqChannelStorage(txId, channelId, std::move(wakeUp), ctx);
+ return new TDqChannelStorage(txId, channelId, std::move(wakeUp));
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h
index f0d6905f02..c54e9ad3cf 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage.h
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h
@@ -7,6 +7,6 @@
namespace NYql::NDq {
NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
- NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, const NActors::TActorContext& ctx);
+ NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index b8ade00c4f..e7c3b4372d 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -121,6 +121,7 @@ private:
if (MemoryQuota) {
MemoryQuota->TryReleaseQuota();
}
+ TaskRunner.Reset();
TActor<TLocalTaskRunnerActor>::PassAway();
}
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 5b1a7c0bf4..2131ed5713 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -263,7 +263,7 @@ private:
auto wakeup = [this]{ ResumeExecution(EResumeSource::Default); };
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(
- TraceId, UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext());
+ TraceId, UseSpilling, std::move(wakeup));
Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits, NDqProto::DQ_STATS_MODE_BASIC, execCtx));
}