summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp17
-rw-r--r--ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp5
-rw-r--r--ydb/core/fq/libs/checkpointing/pending_checkpoint.h3
-rw-r--r--ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp1
4 files changed, 20 insertions, 6 deletions
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
index f1a00395452..6c2b5d9afd3 100644
--- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -460,18 +460,23 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt
if (status == NYql::NDqProto::TEvSaveTaskStateResult::OK) {
checkpoint.Acknowledge(ev->Sender, proto.GetStateSizeBytes());
CC_LOG_D("[" << checkpointId << "] Task state saved, need " << checkpoint.NotYetAcknowledgedCount() << " more acks");
- if (checkpoint.GotAllAcknowledges()) {
+ } else {
+ checkpoint.Abort(ev->Sender);
+ CC_LOG_E("[" << checkpointId << "] StorageError: can't save node state, aborting checkpoint");
+ ++*Metrics.StorageError;
+ }
+ if (checkpoint.GotAllAcknowledges()) {
+ if (checkpoint.GetStats().Aborted) {
+ CC_LOG_E("[" << checkpointId << "] Got all acks for aborted checkpoint, aborting in storage");
+ CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
+ Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
+ } else {
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'");
Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery);
if (InitingZeroCheckpoint) {
Send(RunActorId, new TEvCheckpointCoordinator::TEvZeroCheckpointDone());
}
}
- } else {
- CC_LOG_E("[" << checkpointId << "] StorageError: can't save node state, aborting checkpoint");
- ++*Metrics.StorageError;
- CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
- Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
}
}
diff --git a/ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp b/ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp
index 25deb30c549..4852226bab3 100644
--- a/ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp
+++ b/ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp
@@ -20,6 +20,11 @@ void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId, ui64 stat
Stats.StateSize += stateSize;
}
+void TPendingCheckpoint::Abort(const NActors::TActorId& actorId) {
+ Acknowledge(actorId);
+ Stats.Aborted = true;
+}
+
bool TPendingCheckpoint::GotAllAcknowledges() const {
return NotYetAcknowledged.empty();
}
diff --git a/ydb/core/fq/libs/checkpointing/pending_checkpoint.h b/ydb/core/fq/libs/checkpointing/pending_checkpoint.h
index 3b01fff61d6..d7b530f0ed6 100644
--- a/ydb/core/fq/libs/checkpointing/pending_checkpoint.h
+++ b/ydb/core/fq/libs/checkpointing/pending_checkpoint.h
@@ -10,6 +10,7 @@ namespace NFq {
struct TPendingCheckpointStats {
const TInstant CreatedAt = TInstant::Now();
ui64 StateSize = 0;
+ bool Aborted = false;
};
class TPendingCheckpoint {
@@ -27,6 +28,8 @@ public:
void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize);
+ void Abort(const NActors::TActorId& actorId);
+
[[nodiscard]]
bool GotAllAcknowledges() const;
diff --git a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
index 255ea344c13..bab3c2aec96 100644
--- a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
+++ b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
@@ -417,6 +417,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
void SaveFailed(TCheckpointId checkpointId) {
MockNodeStateSavedEvent(checkpointId, IngressActor);
MockNodeStateSaveFailedEvent(checkpointId, EgressActor);
+ MockNodeStateSaveFailedEvent(checkpointId, MapActor);
Cerr << "Waiting for TEvAbortCheckpointRequest (storage)" << Endl;
ExpectEvent(StorageProxy,