diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-21 11:51:06 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-21 11:51:06 +0300 |
commit | 96a6a371c53651ccfd7dfdc87fe354cf23de9b5c (patch) | |
tree | 049fedc34c2dfb4d75cb687169cf166ac74f56dd | |
parent | b5a543fb685bd376a0022c6f5dbd804ae1795b58 (diff) | |
download | ydb-96a6a371c53651ccfd7dfdc87fe354cf23de9b5c.tar.gz |
YQ-816 Fail graph instead of VERIFY in retry queue. Add diagnostic information to exceptions
Throw exceptions in retry queue
Catch exceptions in state functions of coordinator and CA checkpoints
ref:6f404fd96479aa92fabb207ab62a54d864290c98
7 files changed, 43 insertions, 17 deletions
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp index 6c976180dc..2df1b6ec33 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp @@ -582,6 +582,12 @@ void TCheckpointCoordinator::PassAway() { TActorBootstrapped<TCheckpointCoordinator>::PassAway(); } +void TCheckpointCoordinator::HandleException(const std::exception& err) { + NYql::TIssues issues; + issues.AddIssue(err.what()); + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Internal error in checkpoint coordinator", issues)); +} + THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition) { return MakeHolder<TCheckpointCoordinator>(coordinatorId, taskControllerId, storageProxy, runActorId, settings, counters, graphParams, stateLoadMode, streamingDisposition); } diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h index 0bb76685e1..512a8a5d61 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h @@ -51,9 +51,10 @@ public: void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev); void Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&); + void HandleException(const std::exception& err); - STRICT_STFUNC(DispatchEvent, + STRICT_STFUNC_EXC(DispatchEvent, hFunc(NYql::NDqs::TEvReadyState, Handle) hFunc(TEvCheckpointStorage::TEvRegisterCoordinatorResponse, Handle) hFunc(NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck, Handle) @@ -71,7 +72,8 @@ public: hFunc(NActors::TEvents::TEvUndelivered, Handle) hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle) hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle) - hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle) + hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle), + ExceptionFunc(std::exception, HandleException) ) void Bootstrap(); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index 9d943cbca3..4afc81a844 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -105,8 +105,9 @@ NDqProto::TComputeActorState CombineForeignState( } // namespace -TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor) +TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor) : TActor(&TDqComputeActorCheckpoints::StateFunc) + , Owner(owner) , TxId(txId) , Task(std::move(task)) , IngressTask(IsIngressTask(Task)) @@ -120,7 +121,7 @@ void TDqComputeActorCheckpoints::Init(NActors::TActorId computeActorId, NActors: EventsQueue.Init(TxId, computeActorId, checkpointsId); } -STRICT_STFUNC(TDqComputeActorCheckpoints::StateFunc, +STRICT_STFUNC_EXC(TDqComputeActorCheckpoints::StateFunc, hFunc(TEvDqCompute::TEvNewCheckpointCoordinator, Handle); hFunc(TEvDqCompute::TEvInjectCheckpoint, Handle); hFunc(TEvDqCompute::TEvSaveTaskStateResult, Handle); @@ -131,9 +132,16 @@ STRICT_STFUNC(TDqComputeActorCheckpoints::StateFunc, hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle); hFunc(TEvRetryQueuePrivate::TEvRetry, Handle); - cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + cFunc(TEvents::TEvPoisonPill::EventType, PassAway);, + ExceptionFunc(std::exception, HandleException) ) +void TDqComputeActorCheckpoints::HandleException(const std::exception& err) { + NYql::TIssues issues; + issues.AddIssue(err.what()); + Send(Owner, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Internal error in checkpointing", issues)); +} + namespace { // Get generation for protobuf event. diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h index d57061f638..af0224678c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h @@ -83,7 +83,7 @@ public: virtual ~ICallbacks() = default; }; - TDqComputeActorCheckpoints(const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor); + TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor); void Init(NActors::TActorId computeActorId, NActors::TActorId checkpointsId); [[nodiscard]] bool HasPendingCheckpoint() const; @@ -110,6 +110,7 @@ private: void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev); void Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev); + void HandleException(const std::exception& err); void PassAway() override; @@ -118,6 +119,7 @@ private: bool ShouldIgnoreOldCoordinator(const E& ev, bool verifyOnGenerationFromFuture = true); private: + const NActors::TActorId Owner; const TTxId TxId; const NDqProto::TDqTask Task; const bool IngressTask; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 14d64c5a86..14c7352fec 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -904,7 +904,7 @@ protected: void HandleExecuteBase(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr& ev) { if (!Checkpoints) { - Checkpoints = new TDqComputeActorCheckpoints(TxId, Task, this); + Checkpoints = new TDqComputeActorCheckpoints(this->SelfId(), TxId, Task, this); Checkpoints->Init(this->SelfId(), this->RegisterWithSameMailbox(Checkpoints)); Channels->SetCheckpointsSupport(); } diff --git a/ydb/library/yql/dq/actors/compute/retry_queue.cpp b/ydb/library/yql/dq/actors/compute/retry_queue.cpp index 2539937691..e382f42017 100644 --- a/ydb/library/yql/dq/actors/compute/retry_queue.cpp +++ b/ydb/library/yql/dq/actors/compute/retry_queue.cpp @@ -73,14 +73,13 @@ void TRetryEventsQueue::RemoveConfirmedEvents(ui64 confirmedSeqNo) { while (!Events.empty() && Events.front()->GetSeqNo() <= confirmedSeqNo) { Events.pop_front(); } - Y_VERIFY(Events.size() <= 10000, - "Too many unconfirmed events: %lu. Confirmed SeqNo: %lu. Unconfirmed SeqNos: %lu-%lu. TxId: \"%s\". EventQueueId: %lu", - Events.size(), - confirmedSeqNo, - Events.front()->GetSeqNo(), - Events.back()->GetSeqNo(), - (TStringBuilder() << TxId).c_str(), - EventQueueId); + if (Events.size() > TEvRetryQueuePrivate::UNCONFIRMED_EVENTS_COUNT_LIMIT) { + throw yexception() + << "Too many unconfirmed events: " << Events.size() + << ". Confirmed SeqNo: " << confirmedSeqNo + << ". Unconfirmed SeqNos: " << Events.front()->GetSeqNo() << "-" << Events.back()->GetSeqNo() + << ". TxId: \"" << TxId << "\". EventQueueId: " << EventQueueId; + } } void TRetryEventsQueue::SendRetryable(const IRetryableEvent::TPtr& ev) { diff --git a/ydb/library/yql/dq/actors/compute/retry_queue.h b/ydb/library/yql/dq/actors/compute/retry_queue.h index 5900a8c986..8ec664cd5f 100644 --- a/ydb/library/yql/dq/actors/compute/retry_queue.h +++ b/ydb/library/yql/dq/actors/compute/retry_queue.h @@ -7,6 +7,7 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/interconnect.h> +#include <util/generic/yexception.h> #include <util/system/types.h> namespace NYql::NDq { @@ -33,6 +34,8 @@ struct TEvRetryQueuePrivate { const ui64 EventQueueId; }; + + static constexpr size_t UNCONFIRMED_EVENTS_COUNT_LIMIT = 10000; }; template <class T> @@ -106,7 +109,13 @@ public: } return true; } else if (seqNo > MyConfirmedSeqNo) { - Y_VERIFY(ReceivedEventsSeqNos.size() < 10000); // Too wide window. + if (ReceivedEventsSeqNos.size() > TEvRetryQueuePrivate::UNCONFIRMED_EVENTS_COUNT_LIMIT) { + throw yexception() + << "Too wide window of reordered events: " << ReceivedEventsSeqNos.size() + << ". MyConfirmedSeqNo: " << MyConfirmedSeqNo + << ". Received SeqNo: " << seqNo + << ". TxId: \"" << TxId << "\". EventQueueId: " << EventQueueId; + } return ReceivedEventsSeqNos.insert(seqNo).second; } return false; @@ -180,7 +189,7 @@ private: bool LocalRecipient = false; ui64 NextSeqNo = 1; std::deque<IRetryableEvent::TPtr> Events; - ui64 MyConfirmedSeqNo = 0; // Recceived events seq no border. + ui64 MyConfirmedSeqNo = 0; // Received events seq no border. std::set<ui64> ReceivedEventsSeqNos; bool Connected = false; bool RetryScheduled = false; |