diff options
| author | Hor911 <[email protected]> | 2025-06-20 08:03:39 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-06-20 08:03:39 +0300 |
| commit | ecbf05b8dc527080f08b026054c592597da8cda7 (patch) | |
| tree | 49d70ac86708615de808e53fcd6800539dffbf82 | |
| parent | 7690c67ca855da4204b67e7a2d4716f84c4ed4e3 (diff) | |
Fail ScanFetcher on Registration (with CA) Timeout (#19900)
| -rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 21 | ||||
| -rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h | 2 |
2 files changed, 20 insertions, 3 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index f34da8ee931..afde096a348 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -21,6 +21,9 @@ static constexpr ui64 MAX_SHARD_RETRIES = 5; // retry after: 0, 250, 500, 1000 static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20; static constexpr ui64 MAX_SHARD_RESOLVES = 3; +constexpr TDuration REGISTRATION_TIMEOUT = TDuration::Seconds(60); +constexpr TDuration PING_PERIOD = TDuration::Seconds(30); + } // anonymous namespace TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TComputeRuntimeSettings& settings, @@ -79,16 +82,18 @@ void TKqpScanFetcherActor::Bootstrap() { auto& state = PendingShards.emplace_back(TShardState(read.GetShardId())); state.Ranges = BuildSerializedTableRanges(read); } + RegistrationStartTime = Now(); for (auto&& c : ComputeActorIds) { Sender<TEvScanExchange::TEvRegisterFetcher>().SendTo(c); } AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "bootstrap")("compute", ComputeActorIds.size())("shards", PendingShards.size()); StartTableScan(); Become(&TKqpScanFetcherActor::StateFunc); - Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup()); + Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup()); } void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) { + RegistrationFinished = true; AFL_ENSURE(ev->Get()->GetFreeSpace()); AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "AckDataFromCompute")("self_id", SelfId())("scan_id", ScanId)( "packs_to_send", InFlightComputes.GetPacksToSendCount())("from", ev->Sender)("shards remain", PendingShards.size())( @@ -697,8 +702,18 @@ void TKqpScanFetcherActor::CheckFinish() { } void TKqpScanFetcherActor::HandleExecute(NActors::TEvents::TEvWakeup::TPtr&) { - InFlightShards.PingAllScanners(); - Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup()); + if (RegistrationFinished) { + InFlightShards.PingAllScanners(); + } else if (Now() - RegistrationStartTime > REGISTRATION_TIMEOUT) { + AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "TEvWakeup")("info", "Abort fetcher due to Registration timeout"); + InFlightShards.AbortAllScanners("Abort fetcher due to Registration timeout"); + TIssues issues; + issues.AddIssue(TIssue("Abort fetcher due to Registration timeout")); + SendGlobalFail(NDqProto::COMPUTE_STATE_FAILURE, NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues); + PassAway(); + return; + } + Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup()); } } // namespace NKikimr::NKqp::NScanPrivate diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h index 67a4721115b..90b8bc29299 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h @@ -189,6 +189,8 @@ private: std::set<ui32> TrackingNodes; ui32 MaxInFlight = 1024; bool IsAggregationRequest = false; + bool RegistrationFinished = false; + TInstant RegistrationStartTime; }; } |
