diff options
| author | yumkam <[email protected]> | 2024-11-25 21:19:21 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-11-25 21:19:21 +0300 |
| commit | ec75229918d0b7357a38e7aa1e92556429ea58a7 (patch) | |
| tree | ade21498f8cf5847ccd02438784621dde914c686 | |
| parent | a925034597464d382de59e1e6c17f98c941d05f6 (diff) | |
checkpointing vs abort: wait for completion before aborting (#11896)
4 files changed, 20 insertions, 6 deletions
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp index f1a00395452..6c2b5d9afd3 100644 --- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp @@ -460,18 +460,23 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt if (status == NYql::NDqProto::TEvSaveTaskStateResult::OK) { checkpoint.Acknowledge(ev->Sender, proto.GetStateSizeBytes()); CC_LOG_D("[" << checkpointId << "] Task state saved, need " << checkpoint.NotYetAcknowledgedCount() << " more acks"); - if (checkpoint.GotAllAcknowledges()) { + } else { + checkpoint.Abort(ev->Sender); + CC_LOG_E("[" << checkpointId << "] StorageError: can't save node state, aborting checkpoint"); + ++*Metrics.StorageError; + } + if (checkpoint.GotAllAcknowledges()) { + if (checkpoint.GetStats().Aborted) { + CC_LOG_E("[" << checkpointId << "] Got all acks for aborted checkpoint, aborting in storage"); + CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot. + Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery); + } else { CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'"); Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery); if (InitingZeroCheckpoint) { Send(RunActorId, new TEvCheckpointCoordinator::TEvZeroCheckpointDone()); } } - } else { - CC_LOG_E("[" << checkpointId << "] StorageError: can't save node state, aborting checkpoint"); - ++*Metrics.StorageError; - CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot. - Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery); } } diff --git a/ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp b/ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp index 25deb30c549..4852226bab3 100644 --- a/ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp +++ b/ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp @@ -20,6 +20,11 @@ void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId, ui64 stat Stats.StateSize += stateSize; } +void TPendingCheckpoint::Abort(const NActors::TActorId& actorId) { + Acknowledge(actorId); + Stats.Aborted = true; +} + bool TPendingCheckpoint::GotAllAcknowledges() const { return NotYetAcknowledged.empty(); } diff --git a/ydb/core/fq/libs/checkpointing/pending_checkpoint.h b/ydb/core/fq/libs/checkpointing/pending_checkpoint.h index 3b01fff61d6..d7b530f0ed6 100644 --- a/ydb/core/fq/libs/checkpointing/pending_checkpoint.h +++ b/ydb/core/fq/libs/checkpointing/pending_checkpoint.h @@ -10,6 +10,7 @@ namespace NFq { struct TPendingCheckpointStats { const TInstant CreatedAt = TInstant::Now(); ui64 StateSize = 0; + bool Aborted = false; }; class TPendingCheckpoint { @@ -27,6 +28,8 @@ public: void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize); + void Abort(const NActors::TActorId& actorId); + [[nodiscard]] bool GotAllAcknowledges() const; diff --git a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp index 255ea344c13..bab3c2aec96 100644 --- a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp +++ b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp @@ -417,6 +417,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { void SaveFailed(TCheckpointId checkpointId) { MockNodeStateSavedEvent(checkpointId, IngressActor); MockNodeStateSaveFailedEvent(checkpointId, EgressActor); + MockNodeStateSaveFailedEvent(checkpointId, MapActor); Cerr << "Waiting for TEvAbortCheckpointRequest (storage)" << Endl; ExpectEvent(StorageProxy, |
