aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-11 23:37:15 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-11 23:37:15 +0300
commit3ab6c0efc3b723aba302dec5243fe8868b330cde (patch)
treeab5f66fcb4594ffbce703f1704f4cbf5b0f25a40
parent3eef01c816b09f7a0ec64a381d835a53cc19631d (diff)
downloadydb-3ab6c0efc3b723aba302dec5243fe8868b330cde.tar.gz
YQ-356 Raise transient issues in case of warnings when mapping state to new query in continue mode
Add comment Raise transient issues ref:a1a894f4486091406d994b5749b5616dd64f44c7
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp25
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp5
-rw-r--r--ydb/core/yq/libs/events/event_ids.h1
-rw-r--r--ydb/core/yq/libs/events/events.h11
4 files changed, 34 insertions, 8 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 5549f8f254..4ec721aa15 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -180,16 +180,17 @@ private:
STRICT_STFUNC(StateFunc,
HFunc(TEvents::TEvAsyncContinue, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
- hFunc(NYq::TEvents::TEvGraphParams, Handle);
- hFunc(NYq::TEvents::TEvDataStreamsReadRulesCreationResult, Handle);
+ hFunc(TEvents::TEvGraphParams, Handle);
+ hFunc(TEvents::TEvDataStreamsReadRulesCreationResult, Handle);
hFunc(NYql::NDqs::TEvQueryResponse, Handle);
hFunc(TEvents::TEvQueryActionResult, Handle);
hFunc(TEvents::TEvForwardPingResponse, Handle);
hFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone, Handle);
+ hFunc(TEvents::TEvRaiseTransientIssues, Handle);
)
STRICT_STFUNC(FinishStateFunc,
- hFunc(NYq::TEvents::TEvDataStreamsReadRulesCreationResult, HandleFinish);
+ hFunc(TEvents::TEvDataStreamsReadRulesCreationResult, HandleFinish);
hFunc(TEvents::TEvDataStreamsReadRulesDeletionResult, HandleFinish);
hFunc(NYql::NDqs::TEvQueryResponse, HandleFinish);
hFunc(TEvents::TEvForwardPingResponse, HandleFinish);
@@ -197,9 +198,10 @@ private:
// Ignore tail of action events after normal work.
IgnoreFunc(TEvents::TEvAsyncContinue);
IgnoreFunc(NActors::TEvents::TEvUndelivered);
- IgnoreFunc(NYq::TEvents::TEvGraphParams);
+ IgnoreFunc(TEvents::TEvGraphParams);
IgnoreFunc(TEvents::TEvQueryActionResult);
IgnoreFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone);
+ IgnoreFunc(TEvents::TEvRaiseTransientIssues);
)
void KillExecuter() {
@@ -496,7 +498,7 @@ private:
return false;
}
- void Handle(NYq::TEvents::TEvGraphParams::TPtr& ev) {
+ void Handle(TEvents::TEvGraphParams::TPtr& ev) {
LOG_D("Graph params with tasks: " << ev->Get()->GraphParams.TasksSize());
DqGraphParams.push_back(ev->Get()->GraphParams);
}
@@ -507,6 +509,14 @@ private:
SetLoadFromCheckpointMode();
}
+ void Handle(TEvents::TEvRaiseTransientIssues::TPtr& ev) {
+ Yq::Private::PingTaskRequest request;
+
+ NYql::IssuesToMessage(ev->Get()->TransientIssues, request.mutable_transient_issues());
+
+ Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, RaiseTransientIssuesCookie);
+ }
+
i32 UpdateResultIndices() {
i32 count = 0;
for (const auto& graphParams : DqGraphParams) {
@@ -733,7 +743,7 @@ private:
ContinueFinish();
}
- void Handle(NYq::TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) {
+ void Handle(TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) {
LOG_D("Read rules creation finished. Issues: " << ev->Get()->Issues.Size());
ReadRulesCreatorId = {};
if (ev->Get()->Issues) {
@@ -745,7 +755,7 @@ private:
}
}
- void HandleFinish(NYq::TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) {
+ void HandleFinish(TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) {
ReadRulesCreatorId = {};
if (ev->Get()->Issues) {
TransientIssues.AddIssues(ev->Get()->Issues);
@@ -1364,6 +1374,7 @@ private:
UpdateQueryInfoCookie,
SaveFinalizingStatusCookie,
SetLoadFromCheckpointModeCookie,
+ RaiseTransientIssuesCookie,
};
};
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
index a9138ef40f..6c976180dc 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -3,6 +3,7 @@
#include "checkpoint_coordinator.h"
#include <ydb/core/yq/libs/actors/logging/log.h>
+#include <ydb/core/yq/libs/events/events.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -124,7 +125,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord
const bool needCheckpointMetadata = StateLoadMode == YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT || StreamingDisposition.has_from_last_checkpoint();
if (needCheckpointMetadata) {
- const bool loadGraphDescription = StateLoadMode == YandexQuery::StateLoadMode::EMPTY && StreamingDisposition.has_from_last_checkpoint();
+ const bool loadGraphDescription = StateLoadMode == YandexQuery::StateLoadMode::EMPTY && StreamingDisposition.has_from_last_checkpoint(); // Continue mode
CC_LOG_I("Send TEvGetCheckpointsMetadataRequest; state load mode: " << YandexQuery::StateLoadMode_Name(StateLoadMode) << "; load graph: " << loadGraphDescription);
Send(StorageProxy,
new TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest(
@@ -230,6 +231,8 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe
if (!result) {
Send(TaskControllerId, new NYql::NDq::TEvDq::TEvAbortExecution(Ydb::StatusIds::BAD_REQUEST, issues));
return;
+ } else { // Report as transient issues
+ Send(RunActorId, new TEvents::TEvRaiseTransientIssues(std::move(issues)));
}
PendingRestoreCheckpoint = TPendingRestoreCheckpoint(checkpoint.CheckpointId, false, ActorsToWaitForSet);
diff --git a/ydb/core/yq/libs/events/event_ids.h b/ydb/core/yq/libs/events/event_ids.h
index 56032d50ea..b947b9d244 100644
--- a/ydb/core/yq/libs/events/event_ids.h
+++ b/ydb/core/yq/libs/events/event_ids.h
@@ -42,6 +42,7 @@ struct TEventIds {
EvForwardPingRequest,
EvForwardPingResponse,
EvGraphParams,
+ EvRaiseTransientIssues,
// Special events
EvEnd
diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h
index 49405cfba4..36c864dca8 100644
--- a/ydb/core/yq/libs/events/events.h
+++ b/ydb/core/yq/libs/events/events.h
@@ -245,6 +245,17 @@ struct TEvents {
NProto::TGraphParams GraphParams;
NThreading::TPromise<NYql::IDqGateway::TResult> Result;
};
+
+ struct TEvRaiseTransientIssues : public NActors::TEventLocal<TEvRaiseTransientIssues, TEventIds::EvRaiseTransientIssues> {
+ TEvRaiseTransientIssues() = default;
+
+ explicit TEvRaiseTransientIssues(NYql::TIssues issues)
+ : TransientIssues(std::move(issues))
+ {
+ }
+
+ NYql::TIssues TransientIssues;
+ };
};
} // namespace NYq