aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-21 11:51:06 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-21 11:51:06 +0300
commit96a6a371c53651ccfd7dfdc87fe354cf23de9b5c (patch)
tree049fedc34c2dfb4d75cb687169cf166ac74f56dd
parentb5a543fb685bd376a0022c6f5dbd804ae1795b58 (diff)
downloadydb-96a6a371c53651ccfd7dfdc87fe354cf23de9b5c.tar.gz
YQ-816 Fail graph instead of VERIFY in retry queue. Add diagnostic information to exceptions
Throw exceptions in retry queue Catch exceptions in state functions of coordinator and CA checkpoints ref:6f404fd96479aa92fabb207ab62a54d864290c98
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp6
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp14
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/retry_queue.cpp15
-rw-r--r--ydb/library/yql/dq/actors/compute/retry_queue.h13
7 files changed, 43 insertions, 17 deletions
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
index 6c976180dc..2df1b6ec33 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -582,6 +582,12 @@ void TCheckpointCoordinator::PassAway() {
TActorBootstrapped<TCheckpointCoordinator>::PassAway();
}
+void TCheckpointCoordinator::HandleException(const std::exception& err) {
+ NYql::TIssues issues;
+ issues.AddIssue(err.what());
+ Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Internal error in checkpoint coordinator", issues));
+}
+
THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition) {
return MakeHolder<TCheckpointCoordinator>(coordinatorId, taskControllerId, storageProxy, runActorId, settings, counters, graphParams, stateLoadMode, streamingDisposition);
}
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
index 0bb76685e1..512a8a5d61 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
@@ -51,9 +51,10 @@ public:
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&);
+ void HandleException(const std::exception& err);
- STRICT_STFUNC(DispatchEvent,
+ STRICT_STFUNC_EXC(DispatchEvent,
hFunc(NYql::NDqs::TEvReadyState, Handle)
hFunc(TEvCheckpointStorage::TEvRegisterCoordinatorResponse, Handle)
hFunc(NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck, Handle)
@@ -71,7 +72,8 @@ public:
hFunc(NActors::TEvents::TEvUndelivered, Handle)
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle)
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle)
- hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle)
+ hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle),
+ ExceptionFunc(std::exception, HandleException)
)
void Bootstrap();
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index 9d943cbca3..4afc81a844 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -105,8 +105,9 @@ NDqProto::TComputeActorState CombineForeignState(
} // namespace
-TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor)
+TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor)
: TActor(&TDqComputeActorCheckpoints::StateFunc)
+ , Owner(owner)
, TxId(txId)
, Task(std::move(task))
, IngressTask(IsIngressTask(Task))
@@ -120,7 +121,7 @@ void TDqComputeActorCheckpoints::Init(NActors::TActorId computeActorId, NActors:
EventsQueue.Init(TxId, computeActorId, checkpointsId);
}
-STRICT_STFUNC(TDqComputeActorCheckpoints::StateFunc,
+STRICT_STFUNC_EXC(TDqComputeActorCheckpoints::StateFunc,
hFunc(TEvDqCompute::TEvNewCheckpointCoordinator, Handle);
hFunc(TEvDqCompute::TEvInjectCheckpoint, Handle);
hFunc(TEvDqCompute::TEvSaveTaskStateResult, Handle);
@@ -131,9 +132,16 @@ STRICT_STFUNC(TDqComputeActorCheckpoints::StateFunc,
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
hFunc(TEvRetryQueuePrivate::TEvRetry, Handle);
- cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway);,
+ ExceptionFunc(std::exception, HandleException)
)
+void TDqComputeActorCheckpoints::HandleException(const std::exception& err) {
+ NYql::TIssues issues;
+ issues.AddIssue(err.what());
+ Send(Owner, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Internal error in checkpointing", issues));
+}
+
namespace {
// Get generation for protobuf event.
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index d57061f638..af0224678c 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -83,7 +83,7 @@ public:
virtual ~ICallbacks() = default;
};
- TDqComputeActorCheckpoints(const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor);
+ TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor);
void Init(NActors::TActorId computeActorId, NActors::TActorId checkpointsId);
[[nodiscard]]
bool HasPendingCheckpoint() const;
@@ -110,6 +110,7 @@ private:
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev);
+ void HandleException(const std::exception& err);
void PassAway() override;
@@ -118,6 +119,7 @@ private:
bool ShouldIgnoreOldCoordinator(const E& ev, bool verifyOnGenerationFromFuture = true);
private:
+ const NActors::TActorId Owner;
const TTxId TxId;
const NDqProto::TDqTask Task;
const bool IngressTask;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 14d64c5a86..14c7352fec 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -904,7 +904,7 @@ protected:
void HandleExecuteBase(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr& ev) {
if (!Checkpoints) {
- Checkpoints = new TDqComputeActorCheckpoints(TxId, Task, this);
+ Checkpoints = new TDqComputeActorCheckpoints(this->SelfId(), TxId, Task, this);
Checkpoints->Init(this->SelfId(), this->RegisterWithSameMailbox(Checkpoints));
Channels->SetCheckpointsSupport();
}
diff --git a/ydb/library/yql/dq/actors/compute/retry_queue.cpp b/ydb/library/yql/dq/actors/compute/retry_queue.cpp
index 2539937691..e382f42017 100644
--- a/ydb/library/yql/dq/actors/compute/retry_queue.cpp
+++ b/ydb/library/yql/dq/actors/compute/retry_queue.cpp
@@ -73,14 +73,13 @@ void TRetryEventsQueue::RemoveConfirmedEvents(ui64 confirmedSeqNo) {
while (!Events.empty() && Events.front()->GetSeqNo() <= confirmedSeqNo) {
Events.pop_front();
}
- Y_VERIFY(Events.size() <= 10000,
- "Too many unconfirmed events: %lu. Confirmed SeqNo: %lu. Unconfirmed SeqNos: %lu-%lu. TxId: \"%s\". EventQueueId: %lu",
- Events.size(),
- confirmedSeqNo,
- Events.front()->GetSeqNo(),
- Events.back()->GetSeqNo(),
- (TStringBuilder() << TxId).c_str(),
- EventQueueId);
+ if (Events.size() > TEvRetryQueuePrivate::UNCONFIRMED_EVENTS_COUNT_LIMIT) {
+ throw yexception()
+ << "Too many unconfirmed events: " << Events.size()
+ << ". Confirmed SeqNo: " << confirmedSeqNo
+ << ". Unconfirmed SeqNos: " << Events.front()->GetSeqNo() << "-" << Events.back()->GetSeqNo()
+ << ". TxId: \"" << TxId << "\". EventQueueId: " << EventQueueId;
+ }
}
void TRetryEventsQueue::SendRetryable(const IRetryableEvent::TPtr& ev) {
diff --git a/ydb/library/yql/dq/actors/compute/retry_queue.h b/ydb/library/yql/dq/actors/compute/retry_queue.h
index 5900a8c986..8ec664cd5f 100644
--- a/ydb/library/yql/dq/actors/compute/retry_queue.h
+++ b/ydb/library/yql/dq/actors/compute/retry_queue.h
@@ -7,6 +7,7 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/interconnect.h>
+#include <util/generic/yexception.h>
#include <util/system/types.h>
namespace NYql::NDq {
@@ -33,6 +34,8 @@ struct TEvRetryQueuePrivate {
const ui64 EventQueueId;
};
+
+ static constexpr size_t UNCONFIRMED_EVENTS_COUNT_LIMIT = 10000;
};
template <class T>
@@ -106,7 +109,13 @@ public:
}
return true;
} else if (seqNo > MyConfirmedSeqNo) {
- Y_VERIFY(ReceivedEventsSeqNos.size() < 10000); // Too wide window.
+ if (ReceivedEventsSeqNos.size() > TEvRetryQueuePrivate::UNCONFIRMED_EVENTS_COUNT_LIMIT) {
+ throw yexception()
+ << "Too wide window of reordered events: " << ReceivedEventsSeqNos.size()
+ << ". MyConfirmedSeqNo: " << MyConfirmedSeqNo
+ << ". Received SeqNo: " << seqNo
+ << ". TxId: \"" << TxId << "\". EventQueueId: " << EventQueueId;
+ }
return ReceivedEventsSeqNos.insert(seqNo).second;
}
return false;
@@ -180,7 +189,7 @@ private:
bool LocalRecipient = false;
ui64 NextSeqNo = 1;
std::deque<IRetryableEvent::TPtr> Events;
- ui64 MyConfirmedSeqNo = 0; // Recceived events seq no border.
+ ui64 MyConfirmedSeqNo = 0; // Received events seq no border.
std::set<ui64> ReceivedEventsSeqNos;
bool Connected = false;
bool RetryScheduled = false;