aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilitov Mikhail <filitovme@gmail.com>2024-08-09 19:08:53 +0200
committerGitHub <noreply@github.com>2024-08-09 19:08:53 +0200
commit49e62b47eec455deb9139643a0f6b56960352b9a (patch)
tree3ff6adcc2e1edaa2ca1fcde9c32bb4ca4f1cb784
parentee6b772c5f61986bc4b03006fb56680cd91a3a30 (diff)
downloadydb-49e62b47eec455deb9139643a0f6b56960352b9a.tar.gz
Handle spilling errors correctly (#7435)
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp3
-rw-r--r--ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp64
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp15
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h8
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage.cpp11
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage.h4
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp27
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/spilling/compute_storage.cpp4
-rw-r--r--ydb/library/yql/dq/actors/spilling/compute_storage.h2
-rw-r--r--ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp22
-rw-r--r--ydb/library/yql/dq/actors/spilling/compute_storage_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/spilling/spiller_factory.h10
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp4
-rw-r--r--ydb/library/yql/dq/common/dq_common.h3
-rw-r--r--ydb/library/yql/dq/runtime/dq_channel_storage.h2
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h9
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp5
25 files changed, 145 insertions, 83 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
index 2633037dcb..bdfbb2eeb7 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
@@ -14,8 +14,8 @@ using namespace NYql::NDq;
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
- TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
- : TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
+ TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
+ : TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
, WithSpilling_(withSpilling)
{
}
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 2dbe94b66b..ab43bc1e5f 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
SetTaskRunner(taskRunner);
- auto wakeup = [this]{ ContinueExecute(); };
+ auto wakeupCallback = [this]{ ContinueExecute(); };
+ auto errorCallback = [this](const TString& error){ SendError(error); };
try {
- PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
+ PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 5fd47645cc..6f3f43e91b 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -220,7 +220,8 @@ void TKqpScanComputeActor::DoBootstrap() {
TBase::SetTaskRunner(taskRunner);
auto wakeup = [this] { ContinueExecute(); };
- TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
+ auto errorCallback = [this](const TString& error){ SendError(error); };
+ TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));
ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
ScanData = &ComputeCtx.GetTableScan(0);
diff --git a/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp b/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp
index 691c3d308f..ab5ff38287 100644
--- a/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp
+++ b/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp
@@ -32,7 +32,7 @@ NKikimrConfig::TAppConfig AppCfg() {
return appCfg;
}
-NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) {
+NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold, bool enableSpilling=true) {
NKikimrConfig::TAppConfig appCfg;
auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
@@ -43,12 +43,32 @@ NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) {
auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();
- spilling->SetEnable(true);
+ spilling->SetEnable(enableSpilling);
spilling->SetRoot("./spilling/");
return appCfg;
}
+void FillTableWithData(NQuery::TQueryClient& db, ui64 numRows=300) {
+ for (ui32 i = 0; i < numRows; ++i) {
+ auto result = db.ExecuteQuery(Sprintf(R"(
+ --!syntax_v1
+ REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
+ )", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+}
+
+constexpr auto SimpleGraceJoinWithSpillingQuery = R"(
+ --!syntax_v1
+ PRAGMA ydb.EnableSpillingNodes="GraceJoin";
+ PRAGMA ydb.CostBasedOptimizationLevel='0';
+ PRAGMA ydb.HashJoinMode='graceandself';
+ select t1.Key, t1.Value, t2.Key, t2.Value
+ from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
+ order by t1.Value
+ )";
+
} // anonymous namespace
@@ -79,31 +99,15 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
auto db = kikimr.GetQueryClient();
- for (ui32 i = 0; i < 300; ++i) {
- auto result = db.ExecuteQuery(Sprintf(R"(
- --!syntax_v1
- REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
- )", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- }
-
- auto query = R"(
- --!syntax_v1
- PRAGMA ydb.EnableSpillingNodes="GraceJoin";
- PRAGMA ydb.CostBasedOptimizationLevel='0';
- PRAGMA ydb.HashJoinMode='graceandself';
- select t1.Key, t1.Value, t2.Key, t2.Value
- from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
- order by t1.Value
- )";
+ FillTableWithData(db);
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
- auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
+ auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
Cerr << planres.GetStats()->GetAst() << Endl;
- auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
+ auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
@@ -116,6 +120,24 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
}
}
+Y_UNIT_TEST(HandleErrorsCorrectly) {
+ Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
+ TKikimrRunner kikimr(AppCfgLowComputeLimits(0.01, false));
+
+ auto db = kikimr.GetQueryClient();
+
+ FillTableWithData(db);
+
+ auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
+ auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
+
+ Cerr << planres.GetStats()->GetAst() << Endl;
+
+ auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::INTERNAL_ERROR, result.GetIssues().ToString());
+}
+
Y_UNIT_TEST(SelfJoinQueryService) {
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index af3eb3c31c..d61431eb16 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -1012,7 +1012,11 @@ public:
return {};
}
- std::function<void()> GetWakeupCallback() const override {
+ NDq::TWakeUpCallback GetWakeupCallback() const override {
+ return {};
+ }
+
+ NDq::TErrorCallback GetErrorCallback() const override {
return {};
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index f6629fc5b5..dac377c889 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -127,8 +127,9 @@ public:
Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>);
- auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
- std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeup));
+ auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
+ auto errorCallback = [this](const TString& error){ SendError(error); };
+ std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeupCallback), std::move(errorCallback));
Send(TaskRunnerActorId,
new NTaskRunnerActor::TEvTaskRunnerCreate(
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 8851f301f7..2f6335e11d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -58,8 +58,9 @@ public:
auto taskRunner = TaskRunnerFactory(GetAllocatorPtr(), Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
- auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
- TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup));
+ auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
+ auto errorCallback = [this](const TString& error){ SendError(error); };
+ TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeupCallback), std::move(errorCallback));
PrepareTaskRunner(execCtx);
ContinueExecute(EResumeSource::CABootstrap);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 4baded583b..9536882a2a 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -629,6 +629,10 @@ protected:
}
}
+ void SendError(const TString& error) {
+ this->Send(this->SelfId(), TEvDq::TEvAbortExecution::InternalError(error));
+ }
+
protected: //TDqComputeActorChannels::ICallbacks
//i64 GetInputChannelFreeSpace(ui64 channelId) is pure and must be overridded in derived class
diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
index 8a776d83fb..9f139c5ea9 100644
--- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
+++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
@@ -213,7 +213,7 @@ protected:
TaskRunner->Prepare(this->Task, limits, execCtx);
if (this->Task.GetEnableSpilling()) {
- TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback()));
+ TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
}
for (auto& [channelId, channel] : this->InputChannelsMap) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
index 2f50fcbd5a..764920be05 100644
--- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
@@ -6,9 +6,10 @@
namespace NYql {
namespace NDq {
-TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
+TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
: TxId_(txId)
- , WakeUp_(std::move(wakeUp))
+ , WakeUpCallback_(std::move(wakeUpCallback))
+ , ErrorCallback_(std::move(errorCallback))
{
}
@@ -18,14 +19,18 @@ IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const {
if (withSpilling) {
- return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
+ return CreateDqChannelStorage(TxId_, channelId, WakeUpCallback_, ErrorCallback_, actorSystem);
} else {
return nullptr;
}
}
-std::function<void()> TDqTaskRunnerExecutionContext::GetWakeupCallback() const {
- return WakeUp_;
+TWakeUpCallback TDqTaskRunnerExecutionContext::GetWakeupCallback() const {
+ return WakeUpCallback_;
+}
+
+TErrorCallback TDqTaskRunnerExecutionContext::GetErrorCallback() const {
+ return ErrorCallback_;
}
TTxId TDqTaskRunnerExecutionContext::GetTxId() const {
diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
index f7fc7a0dea..ac0bc49482 100644
--- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
+++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
@@ -9,17 +9,19 @@ namespace NDq {
class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
public:
- TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp);
+ TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& WakeUpCallback_, TErrorCallback&& ErrorCallback_);
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const override;
- std::function<void()> GetWakeupCallback() const override;
+ TWakeUpCallback GetWakeupCallback() const override;
+ TErrorCallback GetErrorCallback() const override;
TTxId GetTxId() const override;
private:
const TTxId TxId_;
- const IDqChannelStorage::TWakeUpCallback WakeUp_;
+ const TWakeUpCallback WakeUpCallback_;
+ const TErrorCallback ErrorCallback_;
};
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
index 79ca0fdae1..90f5a9a0e4 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
@@ -30,10 +30,10 @@ class TDqChannelStorage : public IDqChannelStorage {
NThreading::TFuture<void> IsBlobWrittenFuture_;
};
public:
- TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
+ TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
: ActorSystem_(actorSystem)
{
- ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
+ ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
ChannelStorageActorId_ = ActorSystem_->Register(ChannelStorageActor_->GetActor());
}
@@ -119,9 +119,12 @@ private:
} // anonymous namespace
-IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
+IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
+ TWakeUpCallback wakeUpCallback,
+ TErrorCallback errorCallback,
+ TActorSystem* actorSystem)
{
- return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
+ return new TDqChannelStorage(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h
index 03ce1b8f6f..8588fd6519 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage.h
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h
@@ -11,6 +11,8 @@ namespace NActors {
namespace NYql::NDq {
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
- IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);
+ TWakeUpCallback wakeUpCallback,
+ TErrorCallback errorCallback,
+ NActors::TActorSystem* actorSystem);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
index 4418ff61dd..b00e505eb9 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
@@ -43,10 +43,11 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
using TBase = TActorBootstrapped<TDqChannelStorageActor>;
public:
- TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
+ TDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
: TxId_(txId)
, ChannelId_(channelId)
- , WakeUp_(std::move(wakeUp))
+ , WakeUpCallback_(std::move(wakeUpCallback))
+ , ErrorCallback_(std::move(errorCallback))
, ActorSystem_(actorSystem)
{}
@@ -65,13 +66,12 @@ public:
protected:
void FailWithError(const TString& error) {
+ if (!ErrorCallback_) Y_ABORT("Error: %s", error.c_str());
+
LOG_E("Error: " << error);
+ ErrorCallback_(error);
SendInternal(SpillingActorId_, new TEvents::TEvPoison);
PassAway();
-
- // Currently there is no better way to handle the error.
- // Since the message was not sent from the actor system, there is no one to send the error message to.
- Y_ABORT("Error: %s", error.c_str());
}
void SendInternal(const TActorId& recipient, IEventBase* ev, TEventFlags flags = IEventHandle::FlagTrackDelivery) {
@@ -130,7 +130,7 @@ private:
it->second.SetValue();
WritingBlobs_.erase(it);
- WakeUp_();
+ WakeUpCallback_();
}
void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
@@ -146,7 +146,7 @@ private:
it->second.SetValue(std::move(msg.Blob));
LoadingBlobs_.erase(it);
- WakeUp_();
+ WakeUpCallback_();
}
void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
@@ -163,7 +163,8 @@ private:
private:
const TTxId TxId_;
const ui64 ChannelId_;
- IDqChannelStorage::TWakeUpCallback WakeUp_;
+ TWakeUpCallback WakeUpCallback_;
+ TErrorCallback ErrorCallback_;
TActorId SpillingActorId_;
// BlobId -> promise that blob is saved
@@ -177,8 +178,12 @@ private:
} // anonymous namespace
-IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) {
- return new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
+IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId,
+ TWakeUpCallback&& wakeUpCallback,
+ TErrorCallback&& errorCallback,
+ NActors::TActorSystem* actorSystem)
+{
+ return new TDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h
index 7b509af38a..cf9f9c997b 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h
@@ -49,6 +49,6 @@ public:
virtual NActors::IActor* GetActor() = 0;
};
-IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem);
+IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, NActors::TActorSystem* actorSystem);
} // namespace NYql::NDq \ No newline at end of file
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp
index cba96b115a..e6d4ea468e 100644
--- a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp
+++ b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp
@@ -6,10 +6,10 @@ namespace NYql::NDq {
using namespace NActors;
-TDqComputeStorage::TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
+TDqComputeStorage::TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
TStringStream spillerName;
spillerName << "Spiller" << "_" << CreateGuidAsString();
- ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback);
+ ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, errorCallback);
ComputeStorageActorId_ = ActorSystem_->Register(ComputeStorageActor_->GetActor());
}
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.h b/ydb/library/yql/dq/actors/spilling/compute_storage.h
index 695770bfef..ca8d19c142 100644
--- a/ydb/library/yql/dq/actors/spilling/compute_storage.h
+++ b/ydb/library/yql/dq/actors/spilling/compute_storage.h
@@ -17,7 +17,7 @@ class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller
{
public:
- TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, NActors::TActorSystem* actorSystem);
+ TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, NActors::TActorSystem* actorSystem);
~TDqComputeStorage();
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp
index 77b37f2a3e..47c800b5bd 100644
--- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp
+++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp
@@ -40,10 +40,11 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
// void promise that completes when block is removed
using TDeletingBlobInfo = NThreading::TPromise<void>;
public:
- TDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback)
+ TDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback, TErrorCallback errorCallback)
: TxId_(txId),
SpillerName_(spillerName),
- WakeupCallback_(wakeupCallback)
+ WakeupCallback_(wakeupCallback),
+ ErrorCallback_(errorCallback)
{
}
@@ -63,18 +64,16 @@ public:
protected:
void FailWithError(const TString& error) {
+ if (!ErrorCallback_) Y_ABORT("Error: %s", error.c_str());
+
LOG_E("Error: " << error);
+ ErrorCallback_(error);
SendInternal(SpillingActorId_, new TEvents::TEvPoison);
PassAway();
-
- // Currently there is no better way to handle the error.
- // Since the message was not sent from the actor system, there is no one to send the error message to.
- Y_ABORT("Error: %s", error.c_str());
}
void SendInternal(const TActorId& recipient, IEventBase* ev, TEventFlags flags = IEventHandle::FlagTrackDelivery) {
- bool isSent = Send(recipient, ev, flags);
- Y_ABORT_UNLESS(isSent, "Event was not sent");
+ if (!Send(recipient, ev, flags)) FailWithError("Event was not sent");
}
private:
@@ -244,15 +243,16 @@ private:
bool IsInitialized_ = false;
- std::function<void()> WakeupCallback_;
+ TWakeUpCallback WakeupCallback_;
+ TErrorCallback ErrorCallback_;
TSet<TKey> StoredBlobs_;
};
} // anonymous namespace
-IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback) {
- return new TDqComputeStorageActor(txId, spillerName, wakeupCallback);
+IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback, TErrorCallback errorCallback) {
+ return new TDqComputeStorageActor(txId, spillerName, wakeupCallback, errorCallback);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h
index 5c680a54bf..6ee1636332 100644
--- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h
+++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h
@@ -61,6 +61,6 @@ struct TEvDelete : NActors::TEventLocal<TEvDelete, TDqComputeStorageActorEvents:
NThreading::TPromise<void> Promise_;
};
-IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback);
+IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback, TErrorCallback errorCallback);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/spiller_factory.h b/ydb/library/yql/dq/actors/spilling/spiller_factory.h
index b8c0bc0090..41a3466a2f 100644
--- a/ydb/library/yql/dq/actors/spilling/spiller_factory.h
+++ b/ydb/library/yql/dq/actors/spilling/spiller_factory.h
@@ -11,21 +11,23 @@ using namespace NActors;
class TDqSpillerFactory : public NKikimr::NMiniKQL::ISpillerFactory
{
public:
- TDqSpillerFactory(TTxId txId, TActorSystem* actorSystem, std::function<void()> wakeUpCallback)
+ TDqSpillerFactory(TTxId txId, TActorSystem* actorSystem, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback)
: ActorSystem_(actorSystem),
TxId_(txId),
- WakeUpCallback_(wakeUpCallback)
+ WakeUpCallback_(wakeUpCallback),
+ ErrorCallback_(errorCallback)
{
}
NKikimr::NMiniKQL::ISpiller::TPtr CreateSpiller() override {
- return std::make_shared<TDqComputeStorage>(TxId_, WakeUpCallback_, ActorSystem_);
+ return std::make_shared<TDqComputeStorage>(TxId_, WakeUpCallback_, ErrorCallback_, ActorSystem_);
}
private:
TActorSystem* ActorSystem_;
TTxId TxId_;
- std::function<void()> WakeUpCallback_;
+ TWakeUpCallback WakeUpCallback_;
+ TErrorCallback ErrorCallback_;
};
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 33cb50221d..b5c6b4a677 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -86,6 +86,7 @@ public:
}
private:
+
void OnStatisticsRequest(TEvStatistics::TPtr& ev) {
THashMap<ui32, const IDqAsyncOutputBuffer*> sinks;
@@ -441,7 +442,8 @@ private:
if (settings.GetEnableSpilling()) {
auto wakeUpCallback = ev->Get()->ExecCtx->GetWakeupCallback();
- TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(TxId, NActors::TActivationContext::ActorSystem(), wakeUpCallback));
+ auto errorCallback = ev->Get()->ExecCtx->GetErrorCallback();
+ TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(TxId, NActors::TActivationContext::ActorSystem(), wakeUpCallback, errorCallback));
}
auto event = MakeHolder<TEvTaskRunnerCreateFinished>(
diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h
index 557b3abaa3..e7ed60275b 100644
--- a/ydb/library/yql/dq/common/dq_common.h
+++ b/ydb/library/yql/dq/common/dq_common.h
@@ -11,6 +11,9 @@ using TTxId = std::variant<ui64, TString>;
using TLogFunc = std::function<void(const TString& message)>;
+using TWakeUpCallback = std::function<void()>;
+using TErrorCallback = std::function<void(const TString& error)>;
+
template <ui32 TEventSpaceBegin, ui32 TEventSpaceDiff = 0>
struct TBaseDqResManEvents {
enum {
diff --git a/ydb/library/yql/dq/runtime/dq_channel_storage.h b/ydb/library/yql/dq/runtime/dq_channel_storage.h
index eaafab3180..62dcdebdb8 100644
--- a/ydb/library/yql/dq/runtime/dq_channel_storage.h
+++ b/ydb/library/yql/dq/runtime/dq_channel_storage.h
@@ -15,8 +15,6 @@ class IDqChannelStorage : public TSimpleRefCount<IDqChannelStorage> {
public:
using TPtr = TIntrusivePtr<IDqChannelStorage>;
- using TWakeUpCallback = std::function<void()>;
-
public:
virtual ~IDqChannelStorage() = default;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 065aa97d1b..597714913c 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -138,7 +138,8 @@ public:
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const = 0;
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const = 0;
- virtual std::function<void()> GetWakeupCallback() const = 0;
+ virtual TWakeUpCallback GetWakeupCallback() const = 0;
+ virtual TErrorCallback GetErrorCallback() const = 0;
virtual TTxId GetTxId() const = 0;
};
@@ -161,7 +162,11 @@ public:
return {};
};
- std::function<void()> GetWakeupCallback() const override {
+ TWakeUpCallback GetWakeupCallback() const override {
+ return {};
+ }
+
+ TErrorCallback GetErrorCallback() const override {
return {};
}
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index f7a33da883..c34ba8e1e5 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -280,8 +280,9 @@ private:
limits.ChannelBufferSize = 20_MB;
limits.OutputChunkMaxSize = 2_MB;
- auto wakeup = [this]{ ResumeExecution(EResumeSource::Default); };
- std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TraceId, std::move(wakeup));
+ auto wakeupCallback = [this]{ ResumeExecution(EResumeSource::Default); };
+ auto errorCallback = [this](const TString& error){ this->Send(this->SelfId(), new TEvDqFailure(StatusIds::INTERNAL_ERROR, error)); };
+ std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TraceId, std::move(wakeupCallback), std::move(errorCallback));
Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits, NDqProto::DQ_STATS_MODE_BASIC, execCtx));
}