summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <[email protected]>2025-06-20 08:03:39 +0300
committerGitHub <[email protected]>2025-06-20 08:03:39 +0300
commitecbf05b8dc527080f08b026054c592597da8cda7 (patch)
tree49d70ac86708615de808e53fcd6800539dffbf82
parent7690c67ca855da4204b67e7a2d4716f84c4ed4e3 (diff)
Fail ScanFetcher on Registration (with CA) Timeout (#19900)
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp21
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h2
2 files changed, 20 insertions, 3 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index f34da8ee931..afde096a348 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -21,6 +21,9 @@ static constexpr ui64 MAX_SHARD_RETRIES = 5; // retry after: 0, 250, 500, 1000
static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20;
static constexpr ui64 MAX_SHARD_RESOLVES = 3;
+constexpr TDuration REGISTRATION_TIMEOUT = TDuration::Seconds(60);
+constexpr TDuration PING_PERIOD = TDuration::Seconds(30);
+
} // anonymous namespace
TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TComputeRuntimeSettings& settings,
@@ -79,16 +82,18 @@ void TKqpScanFetcherActor::Bootstrap() {
auto& state = PendingShards.emplace_back(TShardState(read.GetShardId()));
state.Ranges = BuildSerializedTableRanges(read);
}
+ RegistrationStartTime = Now();
for (auto&& c : ComputeActorIds) {
Sender<TEvScanExchange::TEvRegisterFetcher>().SendTo(c);
}
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "bootstrap")("compute", ComputeActorIds.size())("shards", PendingShards.size());
StartTableScan();
Become(&TKqpScanFetcherActor::StateFunc);
- Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup());
+ Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup());
}
void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) {
+ RegistrationFinished = true;
AFL_ENSURE(ev->Get()->GetFreeSpace());
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "AckDataFromCompute")("self_id", SelfId())("scan_id", ScanId)(
"packs_to_send", InFlightComputes.GetPacksToSendCount())("from", ev->Sender)("shards remain", PendingShards.size())(
@@ -697,8 +702,18 @@ void TKqpScanFetcherActor::CheckFinish() {
}
void TKqpScanFetcherActor::HandleExecute(NActors::TEvents::TEvWakeup::TPtr&) {
- InFlightShards.PingAllScanners();
- Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup());
+ if (RegistrationFinished) {
+ InFlightShards.PingAllScanners();
+ } else if (Now() - RegistrationStartTime > REGISTRATION_TIMEOUT) {
+ AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "TEvWakeup")("info", "Abort fetcher due to Registration timeout");
+ InFlightShards.AbortAllScanners("Abort fetcher due to Registration timeout");
+ TIssues issues;
+ issues.AddIssue(TIssue("Abort fetcher due to Registration timeout"));
+ SendGlobalFail(NDqProto::COMPUTE_STATE_FAILURE, NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues);
+ PassAway();
+ return;
+ }
+ Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup());
}
} // namespace NKikimr::NKqp::NScanPrivate
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
index 67a4721115b..90b8bc29299 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
@@ -189,6 +189,8 @@ private:
std::set<ui32> TrackingNodes;
ui32 MaxInFlight = 1024;
bool IsAggregationRequest = false;
+ bool RegistrationFinished = false;
+ TInstant RegistrationStartTime;
};
}