aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-22 15:18:54 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-22 18:21:02 +0300
commit98010c252a85c8e918797c33e403564328c224cc (patch)
tree2c3247ffbc613afa2ab140e46d57530009547afa
parent8bdf165418cc7acd5e30eab6a3b39486395744c6 (diff)
downloadydb-98010c252a85c8e918797c33e403564328c224cc.tar.gz
KIKIMR-19852: fix tablets moving case
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h32
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp19
2 files changed, 30 insertions, 21 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index 9759c12583..ff62c983fc 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -91,7 +91,7 @@ public:
void FinishWaitSendData() {
--DataChunksInFlightCount;
AFL_VERIFY(DataChunksInFlightCount >= 0);
- if (!DataChunksInFlightCount) {
+ if (!DataChunksInFlightCount && !!ActorId) {
DoAck();
}
}
@@ -251,8 +251,6 @@ public:
bool RestartScanner(TShardState& state) {
StopScanner(state.TabletId, false);
- state.State = NComputeActor::EShardState::Initial;
- state.ActorId = {};
state.ResetRetry();
static constexpr ui64 MAX_SHARD_RETRIES = 5; // retry after: 0, 250, 500, 1000, 2000
if (++state.TotalRetries >= MAX_SHARD_RETRIES) {
@@ -278,7 +276,7 @@ public:
const std::shared_ptr<TShardState>& GetShardStateVerified(const ui64 tabletId) const {
auto it = Shards.find(tabletId);
- AFL_VERIFY(it != Shards.end())("tablet_id", tabletId);
+ AFL_VERIFY(it != Shards.end());
return it->second;
}
@@ -292,8 +290,11 @@ public:
}
void RegisterScannerActor(const ui64 tabletId, const TActorId& scanActorId) {
- auto state = GetShardStateVerified(tabletId);
- AFL_VERIFY(state->State == NComputeActor::EShardState::Starting);
+ auto state = GetShardState(tabletId);
+ if (!state) {
+ return;
+ }
+ AFL_VERIFY(state->State == NComputeActor::EShardState::Starting)("state", state->State);
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "RegisterScannerActor")("actor_id", scanActorId)
("state", state->State)("tabletId", state->TabletId);
@@ -318,14 +319,23 @@ public:
void StopScanner(const ui64 tabletId, const bool stopShard = true) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "scanner_finished")("tablet_id", tabletId);
- const auto actorId = GetShardStateVerified(tabletId)->ActorId;
+ auto& state = GetShardStateVerified(tabletId);
+ const auto actorId = state->ActorId;
if (actorId) {
AFL_VERIFY(ShardsByActorId.erase(actorId));
}
- auto it = ShardScanners.find(tabletId);
- AFL_VERIFY(it != ShardScanners.end());
- it->second->Stop(true, "");
- ShardScanners.erase(it);
+ if (state->State != NComputeActor::EShardState::Initial) {
+ state->RetryAttempt++;
+ state->TotalRetries++;
+ state->ActorId = {};
+ state->State = NComputeActor::EShardState::Initial;
+ state->SubscribedOnTablet = false;
+
+ auto it = ShardScanners.find(tabletId);
+ AFL_VERIFY(it != ShardScanners.end());
+ it->second->Stop(true, "");
+ ShardScanners.erase(it);
+ }
if (stopShard) {
AFL_VERIFY(Shards.erase(tabletId));
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index 6a2539abb7..d1b57b7937 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -117,7 +117,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
if (!state) {
return;
}
- AFL_VERIFY(state->State == EShardState::Running);
+ AFL_VERIFY(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender);
TInstant startTime = TActivationContext::Now();
if (ev->Get()->Finished) {
@@ -178,7 +178,10 @@ void TKqpScanFetcherActor::HandleExecute(TEvPipeCache::TEvDeliveryProblem::TPtr&
}
auto& msg = *ev->Get();
- auto state = InFlightShards.GetShardStateVerified(msg.TabletId);
+ auto state = InFlightShards.GetShardState(msg.TabletId);
+ if (!state) {
+ return;
+ }
const auto shardState = state->State;
CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered << ", " << shardState);
@@ -192,7 +195,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) {
return;
}
auto state = InFlightShards.GetShardStateVerified(ev->Get()->TabletId);
- InFlightShards.RestartScanner(*state);
+ InFlightShards.StartScanner(*state);
}
void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
@@ -513,6 +516,7 @@ void TKqpScanFetcherActor::StartTableScan() {
}
void TKqpScanFetcherActor::RetryDeliveryProblem(TShardState::TPtr state) {
+ InFlightShards.StopScanner(state->TabletId, false);
Counters->ScanQueryShardDisconnect->Inc();
if (state->TotalRetries >= MAX_TOTAL_SHARD_RETRIES) {
@@ -528,17 +532,12 @@ void TKqpScanFetcherActor::RetryDeliveryProblem(TShardState::TPtr state) {
// so after several consecutive delivery problem responses retry logic should
// resolve shard details again.
if (state->RetryAttempt >= MAX_SHARD_RETRIES) {
+ state->ResetRetry();
Send(state->ActorId, new NActors::TEvents::TEvPoisonPill());
- return ResolveShard(*state);
+ return EnqueueResolveShard(state);
}
++TotalRetries;
-
- state->RetryAttempt++;
- state->TotalRetries++;
- state->ActorId = {};
- state->State = EShardState::Initial;
- state->SubscribedOnTablet = false;
auto retryDelay = state->CalcRetryDelay();
CA_LOG_W("TKqpScanFetcherActor: broken pipe with tablet " << state->TabletId
<< ", restarting scan from last received key " << state->PrintLastKey(KeyColumnTypes)