diff options
author | Filitov Mikhail <filitovme@gmail.com> | 2024-08-09 19:08:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-09 19:08:53 +0200 |
commit | 49e62b47eec455deb9139643a0f6b56960352b9a (patch) | |
tree | 3ff6adcc2e1edaa2ca1fcde9c32bb4ca4f1cb784 | |
parent | ee6b772c5f61986bc4b03006fb56680cd91a3a30 (diff) | |
download | ydb-49e62b47eec455deb9139643a0f6b56960352b9a.tar.gz |
Handle spilling errors correctly (#7435)
25 files changed, 145 insertions, 83 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index 2633037dcb..bdfbb2eeb7 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -14,8 +14,8 @@ using namespace NYql::NDq; class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext { public: - TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp) - : TDqTaskRunnerExecutionContext(txId, std::move(wakeUp)) + TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback) + : TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback)) , WithSpilling_(withSpilling) { } diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 2dbe94b66b..ab43bc1e5f 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() { auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger); SetTaskRunner(taskRunner); - auto wakeup = [this]{ ContinueExecute(); }; + auto wakeupCallback = [this]{ ContinueExecute(); }; + auto errorCallback = [this](const TString& error){ SendError(error); }; try { - PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup))); + PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback))); } catch (const NMiniKQL::TKqpEnsureFail& e) { InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); return; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 5fd47645cc..6f3f43e91b 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -220,7 +220,8 @@ void TKqpScanComputeActor::DoBootstrap() { TBase::SetTaskRunner(taskRunner); auto wakeup = [this] { ContinueExecute(); }; - TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup))); + auto errorCallback = [this](const TString& error){ SendError(error); }; + TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback))); ComputeCtx.AddTableScan(0, Meta, GetStatsMode()); ScanData = &ComputeCtx.GetTableScan(0); diff --git a/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp b/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp index 691c3d308f..ab5ff38287 100644 --- a/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp +++ b/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp @@ -32,7 +32,7 @@ NKikimrConfig::TAppConfig AppCfg() { return appCfg; } -NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) { +NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold, bool enableSpilling=true) { NKikimrConfig::TAppConfig appCfg; auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); @@ -43,12 +43,32 @@ NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) { auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig(); - spilling->SetEnable(true); + spilling->SetEnable(enableSpilling); spilling->SetRoot("./spilling/"); return appCfg; } +void FillTableWithData(NQuery::TQueryClient& db, ui64 numRows=300) { + for (ui32 i = 0; i < numRows; ++i) { + auto result = db.ExecuteQuery(Sprintf(R"( + --!syntax_v1 + REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s") + )", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } +} + +constexpr auto SimpleGraceJoinWithSpillingQuery = R"( + --!syntax_v1 + PRAGMA ydb.EnableSpillingNodes="GraceJoin"; + PRAGMA ydb.CostBasedOptimizationLevel='0'; + PRAGMA ydb.HashJoinMode='graceandself'; + select t1.Key, t1.Value, t2.Key, t2.Value + from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value + order by t1.Value + )"; + } // anonymous namespace @@ -79,31 +99,15 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) { auto db = kikimr.GetQueryClient(); - for (ui32 i = 0; i < 300; ++i) { - auto result = db.ExecuteQuery(Sprintf(R"( - --!syntax_v1 - REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s") - )", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } - - auto query = R"( - --!syntax_v1 - PRAGMA ydb.EnableSpillingNodes="GraceJoin"; - PRAGMA ydb.CostBasedOptimizationLevel='0'; - PRAGMA ydb.HashJoinMode='graceandself'; - select t1.Key, t1.Value, t2.Key, t2.Value - from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value - order by t1.Value - )"; + FillTableWithData(db); auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain); - auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync(); + auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString()); Cerr << planres.GetStats()->GetAst() << Endl; - auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync(); + auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); @@ -116,6 +120,24 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) { } } +Y_UNIT_TEST(HandleErrorsCorrectly) { + Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl; + TKikimrRunner kikimr(AppCfgLowComputeLimits(0.01, false)); + + auto db = kikimr.GetQueryClient(); + + FillTableWithData(db); + + auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain); + auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString()); + + Cerr << planres.GetStats()->GetAst() << Endl; + + auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::INTERNAL_ERROR, result.GetIssues().ToString()); +} + Y_UNIT_TEST(SelfJoinQueryService) { Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl; diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index af3eb3c31c..d61431eb16 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -1012,7 +1012,11 @@ public: return {}; } - std::function<void()> GetWakeupCallback() const override { + NDq::TWakeUpCallback GetWakeupCallback() const override { + return {}; + } + + NDq::TErrorCallback GetErrorCallback() const override { return {}; } diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index f6629fc5b5..dac377c889 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -127,8 +127,9 @@ public: Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>); - auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; - std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeup)); + auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; + auto errorCallback = [this](const TString& error){ SendError(error); }; + std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeupCallback), std::move(errorCallback)); Send(TaskRunnerActorId, new NTaskRunnerActor::TEvTaskRunnerCreate( diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 8851f301f7..2f6335e11d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -58,8 +58,9 @@ public: auto taskRunner = TaskRunnerFactory(GetAllocatorPtr(), Task, RuntimeSettings.StatsMode, logger); SetTaskRunner(taskRunner); - auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; - TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup)); + auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; + auto errorCallback = [this](const TString& error){ SendError(error); }; + TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeupCallback), std::move(errorCallback)); PrepareTaskRunner(execCtx); ContinueExecute(EResumeSource::CABootstrap); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 4baded583b..9536882a2a 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -629,6 +629,10 @@ protected: } } + void SendError(const TString& error) { + this->Send(this->SelfId(), TEvDq::TEvAbortExecution::InternalError(error)); + } + protected: //TDqComputeActorChannels::ICallbacks //i64 GetInputChannelFreeSpace(ui64 channelId) is pure and must be overridded in derived class diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 8a776d83fb..9f139c5ea9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -213,7 +213,7 @@ protected: TaskRunner->Prepare(this->Task, limits, execCtx); if (this->Task.GetEnableSpilling()) { - TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback())); + TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback())); } for (auto& [channelId, channel] : this->InputChannelsMap) { diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp index 2f50fcbd5a..764920be05 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp @@ -6,9 +6,10 @@ namespace NYql { namespace NDq { -TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp) +TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback) : TxId_(txId) - , WakeUp_(std::move(wakeUp)) + , WakeUpCallback_(std::move(wakeUpCallback)) + , ErrorCallback_(std::move(errorCallback)) { } @@ -18,14 +19,18 @@ IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const { if (withSpilling) { - return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem); + return CreateDqChannelStorage(TxId_, channelId, WakeUpCallback_, ErrorCallback_, actorSystem); } else { return nullptr; } } -std::function<void()> TDqTaskRunnerExecutionContext::GetWakeupCallback() const { - return WakeUp_; +TWakeUpCallback TDqTaskRunnerExecutionContext::GetWakeupCallback() const { + return WakeUpCallback_; +} + +TErrorCallback TDqTaskRunnerExecutionContext::GetErrorCallback() const { + return ErrorCallback_; } TTxId TDqTaskRunnerExecutionContext::GetTxId() const { diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h index f7fc7a0dea..ac0bc49482 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h @@ -9,17 +9,19 @@ namespace NDq { class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase { public: - TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp); + TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& WakeUpCallback_, TErrorCallback&& ErrorCallback_); IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override; IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const override; - std::function<void()> GetWakeupCallback() const override; + TWakeUpCallback GetWakeupCallback() const override; + TErrorCallback GetErrorCallback() const override; TTxId GetTxId() const override; private: const TTxId TxId_; - const IDqChannelStorage::TWakeUpCallback WakeUp_; + const TWakeUpCallback WakeUpCallback_; + const TErrorCallback ErrorCallback_; }; } // namespace NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp index 79ca0fdae1..90f5a9a0e4 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp @@ -30,10 +30,10 @@ class TDqChannelStorage : public IDqChannelStorage { NThreading::TFuture<void> IsBlobWrittenFuture_; }; public: - TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) + TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) { - ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); + ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem); ChannelStorageActorId_ = ActorSystem_->Register(ChannelStorageActor_->GetActor()); } @@ -119,9 +119,12 @@ private: } // anonymous namespace -IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem) +IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, + TWakeUpCallback wakeUpCallback, + TErrorCallback errorCallback, + TActorSystem* actorSystem) { - return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem); + return new TDqChannelStorage(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h index 03ce1b8f6f..8588fd6519 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.h +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h @@ -11,6 +11,8 @@ namespace NActors { namespace NYql::NDq { IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, - IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem); + TWakeUpCallback wakeUpCallback, + TErrorCallback errorCallback, + NActors::TActorSystem* actorSystem); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp index 4418ff61dd..b00e505eb9 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp @@ -43,10 +43,11 @@ class TDqChannelStorageActor : public IDqChannelStorageActor, using TBase = TActorBootstrapped<TDqChannelStorageActor>; public: - TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) + TDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem) : TxId_(txId) , ChannelId_(channelId) - , WakeUp_(std::move(wakeUp)) + , WakeUpCallback_(std::move(wakeUpCallback)) + , ErrorCallback_(std::move(errorCallback)) , ActorSystem_(actorSystem) {} @@ -65,13 +66,12 @@ public: protected: void FailWithError(const TString& error) { + if (!ErrorCallback_) Y_ABORT("Error: %s", error.c_str()); + LOG_E("Error: " << error); + ErrorCallback_(error); SendInternal(SpillingActorId_, new TEvents::TEvPoison); PassAway(); - - // Currently there is no better way to handle the error. - // Since the message was not sent from the actor system, there is no one to send the error message to. - Y_ABORT("Error: %s", error.c_str()); } void SendInternal(const TActorId& recipient, IEventBase* ev, TEventFlags flags = IEventHandle::FlagTrackDelivery) { @@ -130,7 +130,7 @@ private: it->second.SetValue(); WritingBlobs_.erase(it); - WakeUp_(); + WakeUpCallback_(); } void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) { @@ -146,7 +146,7 @@ private: it->second.SetValue(std::move(msg.Blob)); LoadingBlobs_.erase(it); - WakeUp_(); + WakeUpCallback_(); } void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) { @@ -163,7 +163,8 @@ private: private: const TTxId TxId_; const ui64 ChannelId_; - IDqChannelStorage::TWakeUpCallback WakeUp_; + TWakeUpCallback WakeUpCallback_; + TErrorCallback ErrorCallback_; TActorId SpillingActorId_; // BlobId -> promise that blob is saved @@ -177,8 +178,12 @@ private: } // anonymous namespace -IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) { - return new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); +IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, + TWakeUpCallback&& wakeUpCallback, + TErrorCallback&& errorCallback, + NActors::TActorSystem* actorSystem) +{ + return new TDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h index 7b509af38a..cf9f9c997b 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h @@ -49,6 +49,6 @@ public: virtual NActors::IActor* GetActor() = 0; }; -IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem); +IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, NActors::TActorSystem* actorSystem); } // namespace NYql::NDq
\ No newline at end of file diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp index cba96b115a..e6d4ea468e 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp @@ -6,10 +6,10 @@ namespace NYql::NDq { using namespace NActors; -TDqComputeStorage::TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) { +TDqComputeStorage::TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) { TStringStream spillerName; spillerName << "Spiller" << "_" << CreateGuidAsString(); - ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback); + ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, errorCallback); ComputeStorageActorId_ = ActorSystem_->Register(ComputeStorageActor_->GetActor()); } diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.h b/ydb/library/yql/dq/actors/spilling/compute_storage.h index 695770bfef..ca8d19c142 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage.h +++ b/ydb/library/yql/dq/actors/spilling/compute_storage.h @@ -17,7 +17,7 @@ class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller { public: - TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, NActors::TActorSystem* actorSystem); + TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, NActors::TActorSystem* actorSystem); ~TDqComputeStorage(); diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp index 77b37f2a3e..47c800b5bd 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp +++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp @@ -40,10 +40,11 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor // void promise that completes when block is removed using TDeletingBlobInfo = NThreading::TPromise<void>; public: - TDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback) + TDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback, TErrorCallback errorCallback) : TxId_(txId), SpillerName_(spillerName), - WakeupCallback_(wakeupCallback) + WakeupCallback_(wakeupCallback), + ErrorCallback_(errorCallback) { } @@ -63,18 +64,16 @@ public: protected: void FailWithError(const TString& error) { + if (!ErrorCallback_) Y_ABORT("Error: %s", error.c_str()); + LOG_E("Error: " << error); + ErrorCallback_(error); SendInternal(SpillingActorId_, new TEvents::TEvPoison); PassAway(); - - // Currently there is no better way to handle the error. - // Since the message was not sent from the actor system, there is no one to send the error message to. - Y_ABORT("Error: %s", error.c_str()); } void SendInternal(const TActorId& recipient, IEventBase* ev, TEventFlags flags = IEventHandle::FlagTrackDelivery) { - bool isSent = Send(recipient, ev, flags); - Y_ABORT_UNLESS(isSent, "Event was not sent"); + if (!Send(recipient, ev, flags)) FailWithError("Event was not sent"); } private: @@ -244,15 +243,16 @@ private: bool IsInitialized_ = false; - std::function<void()> WakeupCallback_; + TWakeUpCallback WakeupCallback_; + TErrorCallback ErrorCallback_; TSet<TKey> StoredBlobs_; }; } // anonymous namespace -IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback) { - return new TDqComputeStorageActor(txId, spillerName, wakeupCallback); +IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback, TErrorCallback errorCallback) { + return new TDqComputeStorageActor(txId, spillerName, wakeupCallback, errorCallback); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h index 5c680a54bf..6ee1636332 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h +++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h @@ -61,6 +61,6 @@ struct TEvDelete : NActors::TEventLocal<TEvDelete, TDqComputeStorageActorEvents: NThreading::TPromise<void> Promise_; }; -IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback); +IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback, TErrorCallback errorCallback); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spiller_factory.h b/ydb/library/yql/dq/actors/spilling/spiller_factory.h index b8c0bc0090..41a3466a2f 100644 --- a/ydb/library/yql/dq/actors/spilling/spiller_factory.h +++ b/ydb/library/yql/dq/actors/spilling/spiller_factory.h @@ -11,21 +11,23 @@ using namespace NActors; class TDqSpillerFactory : public NKikimr::NMiniKQL::ISpillerFactory { public: - TDqSpillerFactory(TTxId txId, TActorSystem* actorSystem, std::function<void()> wakeUpCallback) + TDqSpillerFactory(TTxId txId, TActorSystem* actorSystem, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback) : ActorSystem_(actorSystem), TxId_(txId), - WakeUpCallback_(wakeUpCallback) + WakeUpCallback_(wakeUpCallback), + ErrorCallback_(errorCallback) { } NKikimr::NMiniKQL::ISpiller::TPtr CreateSpiller() override { - return std::make_shared<TDqComputeStorage>(TxId_, WakeUpCallback_, ActorSystem_); + return std::make_shared<TDqComputeStorage>(TxId_, WakeUpCallback_, ErrorCallback_, ActorSystem_); } private: TActorSystem* ActorSystem_; TTxId TxId_; - std::function<void()> WakeUpCallback_; + TWakeUpCallback WakeUpCallback_; + TErrorCallback ErrorCallback_; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 33cb50221d..b5c6b4a677 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -86,6 +86,7 @@ public: } private: + void OnStatisticsRequest(TEvStatistics::TPtr& ev) { THashMap<ui32, const IDqAsyncOutputBuffer*> sinks; @@ -441,7 +442,8 @@ private: if (settings.GetEnableSpilling()) { auto wakeUpCallback = ev->Get()->ExecCtx->GetWakeupCallback(); - TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(TxId, NActors::TActivationContext::ActorSystem(), wakeUpCallback)); + auto errorCallback = ev->Get()->ExecCtx->GetErrorCallback(); + TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(TxId, NActors::TActivationContext::ActorSystem(), wakeUpCallback, errorCallback)); } auto event = MakeHolder<TEvTaskRunnerCreateFinished>( diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h index 557b3abaa3..e7ed60275b 100644 --- a/ydb/library/yql/dq/common/dq_common.h +++ b/ydb/library/yql/dq/common/dq_common.h @@ -11,6 +11,9 @@ using TTxId = std::variant<ui64, TString>; using TLogFunc = std::function<void(const TString& message)>; +using TWakeUpCallback = std::function<void()>; +using TErrorCallback = std::function<void(const TString& error)>; + template <ui32 TEventSpaceBegin, ui32 TEventSpaceDiff = 0> struct TBaseDqResManEvents { enum { diff --git a/ydb/library/yql/dq/runtime/dq_channel_storage.h b/ydb/library/yql/dq/runtime/dq_channel_storage.h index eaafab3180..62dcdebdb8 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_storage.h +++ b/ydb/library/yql/dq/runtime/dq_channel_storage.h @@ -15,8 +15,6 @@ class IDqChannelStorage : public TSimpleRefCount<IDqChannelStorage> { public: using TPtr = TIntrusivePtr<IDqChannelStorage>; - using TWakeUpCallback = std::function<void()>; - public: virtual ~IDqChannelStorage() = default; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 065aa97d1b..597714913c 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -138,7 +138,8 @@ public: virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const = 0; virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const = 0; - virtual std::function<void()> GetWakeupCallback() const = 0; + virtual TWakeUpCallback GetWakeupCallback() const = 0; + virtual TErrorCallback GetErrorCallback() const = 0; virtual TTxId GetTxId() const = 0; }; @@ -161,7 +162,11 @@ public: return {}; }; - std::function<void()> GetWakeupCallback() const override { + TWakeUpCallback GetWakeupCallback() const override { + return {}; + } + + TErrorCallback GetErrorCallback() const override { return {}; } diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index f7a33da883..c34ba8e1e5 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -280,8 +280,9 @@ private: limits.ChannelBufferSize = 20_MB; limits.OutputChunkMaxSize = 2_MB; - auto wakeup = [this]{ ResumeExecution(EResumeSource::Default); }; - std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TraceId, std::move(wakeup)); + auto wakeupCallback = [this]{ ResumeExecution(EResumeSource::Default); }; + auto errorCallback = [this](const TString& error){ this->Send(this->SelfId(), new TEvDqFailure(StatusIds::INTERNAL_ERROR, error)); }; + std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TraceId, std::move(wakeupCallback), std::move(errorCallback)); Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits, NDqProto::DQ_STATS_MODE_BASIC, execCtx)); } |