diff options
author | gvit <[email protected]> | 2023-02-02 17:19:10 +0300 |
---|---|---|
committer | gvit <[email protected]> | 2023-02-02 17:19:10 +0300 |
commit | fe63c73950e1332e11540093b835b8f86093f073 (patch) | |
tree | 7793cf54e038b29f07d825f0d0d488badff275bb | |
parent | cc87dda0b50405f851e1ad4a069822969d7d1eea (diff) |
fix the bug
4 files changed, 12 insertions, 3 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 9beb26a3fa8..8217876ca2b 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -626,6 +626,7 @@ private: if (state->State == EShardState::PostRunning || state->State == EShardState::Running) { state->State = EShardState::Initial; state->ActorId = {}; + InFlightShards.ClearAckState(state); state->ResetRetry(); return StartReadShard(state); } @@ -954,6 +955,7 @@ private: return ResolveShard(*state); } + InFlightShards.ClearAckState(state); state->RetryAttempt++; state->TotalRetries++; state->Generation = InFlightShards.AllocateGeneration(state); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp index 5a6cb6efe2f..f90c45f6384 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp @@ -139,4 +139,4 @@ TShardCostsState::TPtr TInFlightShards::GetCostsState(const ui64 shardId) const }
}
-}
\ No newline at end of file +}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index 14dd5a02a70..9b5be702931 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -59,6 +59,13 @@ public: return NeedAckStates;
}
+ void ClearAckState(TShardState::TPtr state) {
+ auto it = NeedAckStates.find(state->ScannerIdx);
+ if (it != NeedAckStates.end()) {
+ NeedAckStates.erase(it);
+ }
+ }
+
void AckSent(TShardState::TPtr state) {
Y_VERIFY(StatesByIndex.contains(state->ScannerIdx));
NeedAckStates.erase(state->ScannerIdx);
@@ -117,4 +124,4 @@ public: }
};
-}
\ No newline at end of file +}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h index 3f6a09ebe98..92bf2bec92b 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h @@ -64,4 +64,4 @@ public: TString GetDurationStats() const;
};
-}
\ No newline at end of file +}
|