aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-04 17:43:53 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-04 17:43:53 +0300
commit383a0afb5dc1ef421f9a968e31ab7ead4cb9c52a (patch)
tree1b07374c9be1e2d4ef238677e4ad2596dfa8a656
parent140757450482b97591047d49221ef0a0f513a32d (diff)
downloadydb-383a0afb5dc1ef421f9a968e31ab7ead4cb9c52a.tar.gz
graceful exceptions processing
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp12
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h11
2 files changed, 16 insertions, 7 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index e86c93c19f..876116fe92 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -434,14 +434,14 @@ void TKqpScanFetcherActor::HandleExecute(TEvInterconnect::TEvNodeDisconnected::T
}
}
-bool TKqpScanFetcherActor::SendGlobalFail(const NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssuesIds::EIssueCode issueCode, const TString& message) {
+bool TKqpScanFetcherActor::SendGlobalFail(const NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssuesIds::EIssueCode issueCode, const TString& message) const {
for (auto&& i : ComputeActorIds) {
Send(i, new TEvScanExchange::TEvTerminateFromFetcher(statusCode, issueCode, message));
}
return true;
}
-bool TKqpScanFetcherActor::SendGlobalFail(const NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) {
+bool TKqpScanFetcherActor::SendGlobalFail(const NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) const {
for (auto&& i : ComputeActorIds) {
Send(i, new TEvScanExchange::TEvTerminateFromFetcher(state, statusCode, issues));
}
@@ -823,4 +823,12 @@ void TKqpScanFetcherActor::DoAckAvailableWaiting() {
}
}
+void TKqpScanFetcherActor::StopOnError(const TString& errorMessage) const {
+ CA_LOG_E("unexpected problem: " << errorMessage);
+ TIssue issue(errorMessage);
+ TIssues issues;
+ issues.AddIssue(std::move(issue));
+ SendGlobalFail(NDqProto::COMPUTE_STATE_FAILURE, NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues);
+}
+
}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
index d4aaf39ec5..a87790969f 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
@@ -82,10 +82,10 @@ public:
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
default:
- Y_FAIL("unexpected message");
+ StopOnError("unexpected message on data fetching: " + ::ToString(ev->GetTypeRewrite()));
}
- } catch (const yexception& e) {
- Y_FAIL("UNEXPECTED EXCEPTION: %s", e.what());
+ } catch (...) {
+ StopOnError("unexpected exception: " + CurrentExceptionMessage());
}
}
@@ -97,9 +97,10 @@ private:
std::vector<NActors::TActorId> ComputeActorIds;
- bool SendGlobalFail(const NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssuesIds::EIssueCode issueCode, const TString& message);
+ void StopOnError(const TString& errorMessage) const;
+ bool SendGlobalFail(const NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssuesIds::EIssueCode issueCode, const TString& message) const;
- bool SendGlobalFail(const NYql::NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& issues);
+ bool SendGlobalFail(const NYql::NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& issues) const;
bool ProvideDataToCompute(TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state);