diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-11 23:37:15 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-11 23:37:15 +0300 |
commit | 3ab6c0efc3b723aba302dec5243fe8868b330cde (patch) | |
tree | ab5f66fcb4594ffbce703f1704f4cbf5b0f25a40 | |
parent | 3eef01c816b09f7a0ec64a381d835a53cc19631d (diff) | |
download | ydb-3ab6c0efc3b723aba302dec5243fe8868b330cde.tar.gz |
YQ-356 Raise transient issues in case of warnings when mapping state to new query in continue mode
Add comment
Raise transient issues
ref:a1a894f4486091406d994b5749b5616dd64f44c7
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 25 | ||||
-rw-r--r-- | ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp | 5 | ||||
-rw-r--r-- | ydb/core/yq/libs/events/event_ids.h | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/events/events.h | 11 |
4 files changed, 34 insertions, 8 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 5549f8f254..4ec721aa15 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -180,16 +180,17 @@ private: STRICT_STFUNC(StateFunc, HFunc(TEvents::TEvAsyncContinue, Handle); hFunc(NActors::TEvents::TEvUndelivered, Handle); - hFunc(NYq::TEvents::TEvGraphParams, Handle); - hFunc(NYq::TEvents::TEvDataStreamsReadRulesCreationResult, Handle); + hFunc(TEvents::TEvGraphParams, Handle); + hFunc(TEvents::TEvDataStreamsReadRulesCreationResult, Handle); hFunc(NYql::NDqs::TEvQueryResponse, Handle); hFunc(TEvents::TEvQueryActionResult, Handle); hFunc(TEvents::TEvForwardPingResponse, Handle); hFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone, Handle); + hFunc(TEvents::TEvRaiseTransientIssues, Handle); ) STRICT_STFUNC(FinishStateFunc, - hFunc(NYq::TEvents::TEvDataStreamsReadRulesCreationResult, HandleFinish); + hFunc(TEvents::TEvDataStreamsReadRulesCreationResult, HandleFinish); hFunc(TEvents::TEvDataStreamsReadRulesDeletionResult, HandleFinish); hFunc(NYql::NDqs::TEvQueryResponse, HandleFinish); hFunc(TEvents::TEvForwardPingResponse, HandleFinish); @@ -197,9 +198,10 @@ private: // Ignore tail of action events after normal work. IgnoreFunc(TEvents::TEvAsyncContinue); IgnoreFunc(NActors::TEvents::TEvUndelivered); - IgnoreFunc(NYq::TEvents::TEvGraphParams); + IgnoreFunc(TEvents::TEvGraphParams); IgnoreFunc(TEvents::TEvQueryActionResult); IgnoreFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone); + IgnoreFunc(TEvents::TEvRaiseTransientIssues); ) void KillExecuter() { @@ -496,7 +498,7 @@ private: return false; } - void Handle(NYq::TEvents::TEvGraphParams::TPtr& ev) { + void Handle(TEvents::TEvGraphParams::TPtr& ev) { LOG_D("Graph params with tasks: " << ev->Get()->GraphParams.TasksSize()); DqGraphParams.push_back(ev->Get()->GraphParams); } @@ -507,6 +509,14 @@ private: SetLoadFromCheckpointMode(); } + void Handle(TEvents::TEvRaiseTransientIssues::TPtr& ev) { + Yq::Private::PingTaskRequest request; + + NYql::IssuesToMessage(ev->Get()->TransientIssues, request.mutable_transient_issues()); + + Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, RaiseTransientIssuesCookie); + } + i32 UpdateResultIndices() { i32 count = 0; for (const auto& graphParams : DqGraphParams) { @@ -733,7 +743,7 @@ private: ContinueFinish(); } - void Handle(NYq::TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) { + void Handle(TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) { LOG_D("Read rules creation finished. Issues: " << ev->Get()->Issues.Size()); ReadRulesCreatorId = {}; if (ev->Get()->Issues) { @@ -745,7 +755,7 @@ private: } } - void HandleFinish(NYq::TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) { + void HandleFinish(TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) { ReadRulesCreatorId = {}; if (ev->Get()->Issues) { TransientIssues.AddIssues(ev->Get()->Issues); @@ -1364,6 +1374,7 @@ private: UpdateQueryInfoCookie, SaveFinalizingStatusCookie, SetLoadFromCheckpointModeCookie, + RaiseTransientIssuesCookie, }; }; diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp index a9138ef40f..6c976180dc 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp @@ -3,6 +3,7 @@ #include "checkpoint_coordinator.h" #include <ydb/core/yq/libs/actors/logging/log.h> +#include <ydb/core/yq/libs/events/events.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/hfunc.h> @@ -124,7 +125,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord const bool needCheckpointMetadata = StateLoadMode == YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT || StreamingDisposition.has_from_last_checkpoint(); if (needCheckpointMetadata) { - const bool loadGraphDescription = StateLoadMode == YandexQuery::StateLoadMode::EMPTY && StreamingDisposition.has_from_last_checkpoint(); + const bool loadGraphDescription = StateLoadMode == YandexQuery::StateLoadMode::EMPTY && StreamingDisposition.has_from_last_checkpoint(); // Continue mode CC_LOG_I("Send TEvGetCheckpointsMetadataRequest; state load mode: " << YandexQuery::StateLoadMode_Name(StateLoadMode) << "; load graph: " << loadGraphDescription); Send(StorageProxy, new TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest( @@ -230,6 +231,8 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe if (!result) { Send(TaskControllerId, new NYql::NDq::TEvDq::TEvAbortExecution(Ydb::StatusIds::BAD_REQUEST, issues)); return; + } else { // Report as transient issues + Send(RunActorId, new TEvents::TEvRaiseTransientIssues(std::move(issues))); } PendingRestoreCheckpoint = TPendingRestoreCheckpoint(checkpoint.CheckpointId, false, ActorsToWaitForSet); diff --git a/ydb/core/yq/libs/events/event_ids.h b/ydb/core/yq/libs/events/event_ids.h index 56032d50ea..b947b9d244 100644 --- a/ydb/core/yq/libs/events/event_ids.h +++ b/ydb/core/yq/libs/events/event_ids.h @@ -42,6 +42,7 @@ struct TEventIds { EvForwardPingRequest, EvForwardPingResponse, EvGraphParams, + EvRaiseTransientIssues, // Special events EvEnd diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h index 49405cfba4..36c864dca8 100644 --- a/ydb/core/yq/libs/events/events.h +++ b/ydb/core/yq/libs/events/events.h @@ -245,6 +245,17 @@ struct TEvents { NProto::TGraphParams GraphParams; NThreading::TPromise<NYql::IDqGateway::TResult> Result; }; + + struct TEvRaiseTransientIssues : public NActors::TEventLocal<TEvRaiseTransientIssues, TEventIds::EvRaiseTransientIssues> { + TEvRaiseTransientIssues() = default; + + explicit TEvRaiseTransientIssues(NYql::TIssues issues) + : TransientIssues(std::move(issues)) + { + } + + NYql::TIssues TransientIssues; + }; }; } // namespace NYq |