diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-04 17:43:53 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-04 17:43:53 +0300 |
commit | 383a0afb5dc1ef421f9a968e31ab7ead4cb9c52a (patch) | |
tree | 1b07374c9be1e2d4ef238677e4ad2596dfa8a656 | |
parent | 140757450482b97591047d49221ef0a0f513a32d (diff) | |
download | ydb-383a0afb5dc1ef421f9a968e31ab7ead4cb9c52a.tar.gz |
graceful exceptions processing
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h | 11 |
2 files changed, 16 insertions, 7 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index e86c93c19f..876116fe92 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -434,14 +434,14 @@ void TKqpScanFetcherActor::HandleExecute(TEvInterconnect::TEvNodeDisconnected::T } } -bool TKqpScanFetcherActor::SendGlobalFail(const NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssuesIds::EIssueCode issueCode, const TString& message) { +bool TKqpScanFetcherActor::SendGlobalFail(const NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssuesIds::EIssueCode issueCode, const TString& message) const { for (auto&& i : ComputeActorIds) { Send(i, new TEvScanExchange::TEvTerminateFromFetcher(statusCode, issueCode, message)); } return true; } -bool TKqpScanFetcherActor::SendGlobalFail(const NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) { +bool TKqpScanFetcherActor::SendGlobalFail(const NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) const { for (auto&& i : ComputeActorIds) { Send(i, new TEvScanExchange::TEvTerminateFromFetcher(state, statusCode, issues)); } @@ -823,4 +823,12 @@ void TKqpScanFetcherActor::DoAckAvailableWaiting() { } } +void TKqpScanFetcherActor::StopOnError(const TString& errorMessage) const { + CA_LOG_E("unexpected problem: " << errorMessage); + TIssue issue(errorMessage); + TIssues issues; + issues.AddIssue(std::move(issue)); + SendGlobalFail(NDqProto::COMPUTE_STATE_FAILURE, NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues); +} + } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h index d4aaf39ec5..a87790969f 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h @@ -82,10 +82,10 @@ public: IgnoreFunc(TEvInterconnect::TEvNodeConnected); IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); default: - Y_FAIL("unexpected message"); + StopOnError("unexpected message on data fetching: " + ::ToString(ev->GetTypeRewrite())); } - } catch (const yexception& e) { - Y_FAIL("UNEXPECTED EXCEPTION: %s", e.what()); + } catch (...) { + StopOnError("unexpected exception: " + CurrentExceptionMessage()); } } @@ -97,9 +97,10 @@ private: std::vector<NActors::TActorId> ComputeActorIds; - bool SendGlobalFail(const NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssuesIds::EIssueCode issueCode, const TString& message); + void StopOnError(const TString& errorMessage) const; + bool SendGlobalFail(const NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssuesIds::EIssueCode issueCode, const TString& message) const; - bool SendGlobalFail(const NYql::NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& issues); + bool SendGlobalFail(const NYql::NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& issues) const; bool ProvideDataToCompute(TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state); |