diff options
author | hor911 <hor911@ydb.tech> | 2022-08-24 14:59:47 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-08-24 14:59:47 +0300 |
commit | 509e3a03d27f53b7abd4c6ed7cd07e0d2501f69b (patch) | |
tree | f346f784f5e4322b2de8628c4fdef7b74898e02e | |
parent | cd18da6559baca915feda6ab8972b599363774e7 (diff) | |
download | ydb-509e3a03d27f53b7abd4c6ed7cd07e0d2501f69b.tar.gz |
Replace bool isFatal with StatusCode
11 files changed, 54 insertions, 47 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 40ea4e6dac6..9b75341d47e 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -545,7 +545,7 @@ private: NYql::TIssues issues; issues.AddIssue(std::move(issue)); - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), true)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } private: diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index d7a0a6cd5d7..50475703650 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -55,15 +55,15 @@ struct IDqComputeActorAsyncInput { }; struct TEvAsyncInputError : public NActors::TEventLocal<TEvAsyncInputError, TDqComputeEvents::EvAsyncInputError> { - TEvAsyncInputError(ui64 inputIndex, const TIssues& issues, bool isFatal) + TEvAsyncInputError(ui64 inputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) : InputIndex(inputIndex) , Issues(issues) - , IsFatal(isFatal) + , FatalCode(fatalCode) {} const ui64 InputIndex; const TIssues Issues; - const bool IsFatal; + const NYql::NDqProto::StatusIds::StatusCode FatalCode; }; virtual ui64 GetInputIndex() const = 0; @@ -110,7 +110,7 @@ struct IDqComputeActorAsyncInput { struct IDqComputeActorAsyncOutput { struct ICallbacks { // Compute actor virtual void ResumeExecution() = 0; - virtual void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0; + virtual void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) = 0; // Checkpointing virtual void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 28c03341e55..c71f512809c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -55,8 +55,8 @@ namespace NDq { constexpr ui32 IssuesBufferSize = 16; struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { - void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) override final { - OnSinkError(outputIndex, issues, isFatal); + void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final { + OnSinkError(outputIndex, issues, fatalCode); } void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override final { @@ -67,14 +67,14 @@ struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { OnSinkFinished(outputIndex); } - virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0; + virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) = 0; virtual void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; virtual void OnSinkFinished(ui64 outputIndex) = 0; }; struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { - void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) override final { - OnOutputTransformError(outputIndex, issues, isFatal); + void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final { + OnOutputTransformError(outputIndex, issues, fatalCode); } void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override final { @@ -85,7 +85,7 @@ struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks OnTransformFinished(outputIndex); } - virtual void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0; + virtual void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) = 0; virtual void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; virtual void OnTransformFinished(ui64 outputIndex) = 0; }; @@ -1523,52 +1523,52 @@ protected: void OnAsyncInputError(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) { if (SourcesMap.FindPtr(ev->Get()->InputIndex)) { - OnSourceError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal); + OnSourceError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->FatalCode); } else if (InputTransformsMap.FindPtr(ev->Get()->InputIndex)) { - OnInputTransformError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal); + OnInputTransformError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->FatalCode); } else { YQL_ENSURE(false, "Unexpected input index: " << ev->Get()->InputIndex); } } - void OnSourceError(ui64 inputIndex, const TIssues& issues, bool isFatal) { - if (!isFatal) { + void OnSourceError(ui64 inputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) { + if (fatalCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { SourcesMap.at(inputIndex).IssuesBuffer.Push(issues); return; } CA_LOG_E("Source[" << inputIndex << "] fatal error: " << issues.ToOneLineString()); - InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); + InternalError(fatalCode, issues); } - void OnInputTransformError(ui64 inputIndex, const TIssues& issues, bool isFatal) { - if (!isFatal) { + void OnInputTransformError(ui64 inputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) { + if (fatalCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { InputTransformsMap.at(inputIndex).IssuesBuffer.Push(issues); return; } CA_LOG_E("InputTransform[" << inputIndex << "] fatal error: " << issues.ToOneLineString()); - InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); + InternalError(fatalCode, issues); } - void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { - if (!isFatal) { + void OnSinkError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override { + if (fatalCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { SinksMap.at(outputIndex).IssuesBuffer.Push(issues); return; } CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << issues.ToOneLineString()); - InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); + InternalError(fatalCode, issues); } - void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { - if (!isFatal) { + void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override { + if (fatalCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { OutputTransformsMap.at(outputIndex).IssuesBuffer.Push(issues); return; } CA_LOG_E("OutputTransform[" << outputIndex << "] fatal error: " << issues.ToOneLineString()); - InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); + InternalError(fatalCode, issues); } bool AllAsyncOutputsFinished() const { diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp index e69ca5593c6..aaebb97f036 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp @@ -110,7 +110,7 @@ private: } void Handle(TEvPrivate::TEvReadError::TPtr& result) { - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, true)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } // IActor & IDqComputeActorAsyncInput diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index 2ce1acd7aa5..12a1af40af1 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -77,8 +77,8 @@ class TFakeActor : public NActors::TActor<TFakeActor> { Parent.AsyncInputPromises.NewAsyncInputDataArrived = NThreading::NewPromise(); } - void OnAsyncInputError(ui64, const TIssues& issues, bool isFatal) { - Y_UNUSED(isFatal); + void OnAsyncInputError(ui64, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) { + Y_UNUSED(fatalCode); Parent.AsyncInputPromises.FatalError.SetValue(issues); Parent.AsyncInputPromises.FatalError = NThreading::NewPromise<TIssues>(); } @@ -94,8 +94,8 @@ class TFakeActor : public NActors::TActor<TFakeActor> { Parent.AsyncOutputPromises.ResumeExecution = NThreading::NewPromise(); }; - void OnAsyncOutputError(ui64, const TIssues& issues, bool isFatal) override { - Y_UNUSED(isFatal); + void OnAsyncOutputError(ui64, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override { + Y_UNUSED(fatalCode); Parent.AsyncOutputPromises.Issue.SetValue(issues); Parent.AsyncOutputPromises.Issue = NThreading::NewPromise<TIssues>(); }; @@ -150,7 +150,7 @@ private: } void Handle(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) { - AsyncInputEvents.OnAsyncInputError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal); + AsyncInputEvents.OnAsyncInputError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->FatalCode); } public: diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 2ffdbb76fc9..02914112308 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -662,7 +662,11 @@ private: } void OnAsyncInputError(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) { Y_UNUSED(ev->Get()->InputIndex); - SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString())); + auto fatalCode = ev->Get()->FatalCode; + if (fatalCode != NYql::NDqProto::StatusIds::UNSPECIFIED) { + fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; + } + SendFailure(MakeHolder<TEvDqFailure>(fatalCode, ev->Get()->Issues.ToString())); } void OnAsyncInputPushFinished(TEvAsyncInputPushFinished::TPtr& ev, const TActorContext& ctx) { auto index = ev->Get()->Index; @@ -676,9 +680,12 @@ private: Send(SelfId(), new TEvContinueRun()); } - void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { + void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override { Y_UNUSED(outputIndex); - SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString())); + if (fatalCode != NYql::NDqProto::StatusIds::UNSPECIFIED) { + fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; + } + SendFailure(MakeHolder<TEvDqFailure>(fatalCode, issues.ToString())); } void OnAsyncOutputFinished(ui64 outputIndex) override { diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index a51fcd240bf..ce7879f45a3 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -276,7 +276,7 @@ private: if (issues) { WriteSession->Close(TDuration::Zero()); WriteSession.reset(); - Callbacks->OnAsyncOutputError(OutputIndex, *issues, true); + Callbacks->OnAsyncOutputError(OutputIndex, *issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); break; } @@ -312,7 +312,7 @@ private: void Fail(TString message) { TIssues issues; issues.AddIssue(message); - Callbacks->OnAsyncOutputError(OutputIndex, issues, true); + Callbacks->OnAsyncOutputError(OutputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); } struct TPQEventProcessor { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 9562f5e6201..b66b29bf450 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -217,7 +217,7 @@ private: void Handle(TEvPrivate::TEvReadError::TPtr& result) { ++IsDoneCounter; - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, true)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } // IActor & IDqComputeActorAsyncInput @@ -412,19 +412,19 @@ private: } if (Issues) - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(Issues), true)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(Issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); else Send(ParentActorId, new TEvPrivate::TEvReadFinished); } catch (const TDtorException&) { return RetryStuff->Cancel(); } catch (const std::exception& err) { - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what())}, true)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what())}, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); return; } void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final { TString message = Sprintf("Unexpected message type 0x%08" PRIx32, ev->GetTypeRewrite()); - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(message)}, true)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(message)}, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } private: const ui64 InputIndex; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 4c1800ab917..65bfd5c53b5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -382,7 +382,7 @@ private: } void Handle(TEvPrivate::TEvUploadError::TPtr& result) { - Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true); + Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); } void Handle(TEvPrivate::TEvUploadFinished::TPtr& result) { diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index 0ab2a01e18c..a3cfb56a2c1 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -273,7 +273,7 @@ private: TIssues issues { TIssue(errorBuilder) }; SINK_LOG_W("Got " << (res->IsTerminal ? "terminal " : "") << "error response[" << ev->Cookie << "] from solomon: " << issues.ToOneLineString()); - Callbacks->OnAsyncOutputError(OutputIndex, issues, res->IsTerminal); + Callbacks->OnAsyncOutputError(OutputIndex, issues, res->IsTerminal ? NYql::NDqProto::StatusIds::EXTERNAL_ERROR : NYql::NDqProto::StatusIds::UNSPECIFIED); return; } @@ -316,7 +316,7 @@ private: SendingBuffer.emplace(TMetricsToSend { std::move(data), metricsCount }); } catch (const yexception& e) { TIssues issues { TIssue(TStringBuilder() << "Error while encoding solomon metrics: " << e.what()) }; - Callbacks->OnAsyncOutputError(OutputIndex, issues, true); + Callbacks->OnAsyncOutputError(OutputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); } metricsCount = 0; @@ -419,7 +419,7 @@ private: if (!parser.Parse(TString(response.Response->Body), &res)) { TIssues issues { TIssue(TStringBuilder() << "Invalid monitoring response: " << response.Response->GetObfuscatedData()) }; SINK_LOG_E("Failed to parse response[" << cookie << "] from solomon: " << issues.ToOneLineString()); - Callbacks->OnAsyncOutputError(OutputIndex, issues, true); + Callbacks->OnAsyncOutputError(OutputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); return; } Y_VERIFY(res.size() == 2); @@ -428,7 +428,7 @@ private: if (ptr == InflightBuffer.end()) { SINK_LOG_E("Solomon response[" << cookie << "] was not found in inflight"); TIssues issues { TIssue(TStringBuilder() << "Internal error in monitoring writer") }; - Callbacks->OnAsyncOutputError(OutputIndex, issues, true); + Callbacks->OnAsyncOutputError(OutputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); return; } @@ -437,7 +437,7 @@ private: if (writtenMetricsCount != ptr->second.MetricsCount) { // TODO: YQ-340 // TIssues issues { TIssue(TStringBuilder() << ToString(ptr->second.MetricsCount - writtenMetricsCount) << " metrics were not written: " << res[1]) }; - // Callbacks->OnAsyncOutputError(OutputIndex, issues, true); + // Callbacks->OnAsyncOutputError(OutputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); // return; SINK_LOG_W("Some metrics were not written. MetricsCount=" << ptr->second.MetricsCount << " writtenMetricsCount=" << writtenMetricsCount << " Solomon response: " << response.Response->GetObfuscatedData()); } diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp index 8efe0d995c8..ae7e78e0c7e 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp @@ -191,7 +191,7 @@ private: RequestsDone = true; while(!Blocks.empty()) Blocks.pop(); - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, res.GetIssues(), true)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, res.GetIssues(), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } else { WakeUpTime = TMonotonic::Now() + Min(TDuration::Seconds(3), TDuration::MilliSeconds(0x30U * (1U << ++Retried))); ActorSystem->Schedule(WakeUpTime, new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvRetryTime)); |