diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-22 15:18:54 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-22 18:21:02 +0300 |
commit | 98010c252a85c8e918797c33e403564328c224cc (patch) | |
tree | 2c3247ffbc613afa2ab140e46d57530009547afa | |
parent | 8bdf165418cc7acd5e30eab6a3b39486395744c6 (diff) | |
download | ydb-98010c252a85c8e918797c33e403564328c224cc.tar.gz |
KIKIMR-19852: fix tablets moving case
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h | 32 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 19 |
2 files changed, 30 insertions, 21 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index 9759c12583..ff62c983fc 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -91,7 +91,7 @@ public: void FinishWaitSendData() { --DataChunksInFlightCount; AFL_VERIFY(DataChunksInFlightCount >= 0); - if (!DataChunksInFlightCount) { + if (!DataChunksInFlightCount && !!ActorId) { DoAck(); } } @@ -251,8 +251,6 @@ public: bool RestartScanner(TShardState& state) { StopScanner(state.TabletId, false); - state.State = NComputeActor::EShardState::Initial; - state.ActorId = {}; state.ResetRetry(); static constexpr ui64 MAX_SHARD_RETRIES = 5; // retry after: 0, 250, 500, 1000, 2000 if (++state.TotalRetries >= MAX_SHARD_RETRIES) { @@ -278,7 +276,7 @@ public: const std::shared_ptr<TShardState>& GetShardStateVerified(const ui64 tabletId) const { auto it = Shards.find(tabletId); - AFL_VERIFY(it != Shards.end())("tablet_id", tabletId); + AFL_VERIFY(it != Shards.end()); return it->second; } @@ -292,8 +290,11 @@ public: } void RegisterScannerActor(const ui64 tabletId, const TActorId& scanActorId) { - auto state = GetShardStateVerified(tabletId); - AFL_VERIFY(state->State == NComputeActor::EShardState::Starting); + auto state = GetShardState(tabletId); + if (!state) { + return; + } + AFL_VERIFY(state->State == NComputeActor::EShardState::Starting)("state", state->State); AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "RegisterScannerActor")("actor_id", scanActorId) ("state", state->State)("tabletId", state->TabletId); @@ -318,14 +319,23 @@ public: void StopScanner(const ui64 tabletId, const bool stopShard = true) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "scanner_finished")("tablet_id", tabletId); - const auto actorId = GetShardStateVerified(tabletId)->ActorId; + auto& state = GetShardStateVerified(tabletId); + const auto actorId = state->ActorId; if (actorId) { AFL_VERIFY(ShardsByActorId.erase(actorId)); } - auto it = ShardScanners.find(tabletId); - AFL_VERIFY(it != ShardScanners.end()); - it->second->Stop(true, ""); - ShardScanners.erase(it); + if (state->State != NComputeActor::EShardState::Initial) { + state->RetryAttempt++; + state->TotalRetries++; + state->ActorId = {}; + state->State = NComputeActor::EShardState::Initial; + state->SubscribedOnTablet = false; + + auto it = ShardScanners.find(tabletId); + AFL_VERIFY(it != ShardScanners.end()); + it->second->Stop(true, ""); + ShardScanners.erase(it); + } if (stopShard) { AFL_VERIFY(Shards.erase(tabletId)); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index 6a2539abb7..d1b57b7937 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -117,7 +117,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) { if (!state) { return; } - AFL_VERIFY(state->State == EShardState::Running); + AFL_VERIFY(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender); TInstant startTime = TActivationContext::Now(); if (ev->Get()->Finished) { @@ -178,7 +178,10 @@ void TKqpScanFetcherActor::HandleExecute(TEvPipeCache::TEvDeliveryProblem::TPtr& } auto& msg = *ev->Get(); - auto state = InFlightShards.GetShardStateVerified(msg.TabletId); + auto state = InFlightShards.GetShardState(msg.TabletId); + if (!state) { + return; + } const auto shardState = state->State; CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered << ", " << shardState); @@ -192,7 +195,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) { return; } auto state = InFlightShards.GetShardStateVerified(ev->Get()->TabletId); - InFlightShards.RestartScanner(*state); + InFlightShards.StartScanner(*state); } void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { @@ -513,6 +516,7 @@ void TKqpScanFetcherActor::StartTableScan() { } void TKqpScanFetcherActor::RetryDeliveryProblem(TShardState::TPtr state) { + InFlightShards.StopScanner(state->TabletId, false); Counters->ScanQueryShardDisconnect->Inc(); if (state->TotalRetries >= MAX_TOTAL_SHARD_RETRIES) { @@ -528,17 +532,12 @@ void TKqpScanFetcherActor::RetryDeliveryProblem(TShardState::TPtr state) { // so after several consecutive delivery problem responses retry logic should // resolve shard details again. if (state->RetryAttempt >= MAX_SHARD_RETRIES) { + state->ResetRetry(); Send(state->ActorId, new NActors::TEvents::TEvPoisonPill()); - return ResolveShard(*state); + return EnqueueResolveShard(state); } ++TotalRetries; - - state->RetryAttempt++; - state->TotalRetries++; - state->ActorId = {}; - state->State = EShardState::Initial; - state->SubscribedOnTablet = false; auto retryDelay = state->CalcRetryDelay(); CA_LOG_W("TKqpScanFetcherActor: broken pipe with tablet " << state->TabletId << ", restarting scan from last received key " << state->PrintLastKey(KeyColumnTypes) |