summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <[email protected]>2023-02-02 17:19:10 +0300
committergvit <[email protected]>2023-02-02 17:19:10 +0300
commitfe63c73950e1332e11540093b835b8f86093f073 (patch)
tree7793cf54e038b29f07d825f0d0d488badff275bb
parentcc87dda0b50405f851e1ad4a069822969d7d1eea (diff)
fix the bug
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h9
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h2
4 files changed, 12 insertions, 3 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 9beb26a3fa8..8217876ca2b 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -626,6 +626,7 @@ private:
if (state->State == EShardState::PostRunning || state->State == EShardState::Running) {
state->State = EShardState::Initial;
state->ActorId = {};
+ InFlightShards.ClearAckState(state);
state->ResetRetry();
return StartReadShard(state);
}
@@ -954,6 +955,7 @@ private:
return ResolveShard(*state);
}
+ InFlightShards.ClearAckState(state);
state->RetryAttempt++;
state->TotalRetries++;
state->Generation = InFlightShards.AllocateGeneration(state);
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
index 5a6cb6efe2f..f90c45f6384 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
@@ -139,4 +139,4 @@ TShardCostsState::TPtr TInFlightShards::GetCostsState(const ui64 shardId) const
}
}
-} \ No newline at end of file
+}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index 14dd5a02a70..9b5be702931 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -59,6 +59,13 @@ public:
return NeedAckStates;
}
+ void ClearAckState(TShardState::TPtr state) {
+ auto it = NeedAckStates.find(state->ScannerIdx);
+ if (it != NeedAckStates.end()) {
+ NeedAckStates.erase(it);
+ }
+ }
+
void AckSent(TShardState::TPtr state) {
Y_VERIFY(StatesByIndex.contains(state->ScannerIdx));
NeedAckStates.erase(state->ScannerIdx);
@@ -117,4 +124,4 @@ public:
}
};
-} \ No newline at end of file
+}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h
index 3f6a09ebe98..92bf2bec92b 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h
@@ -64,4 +64,4 @@ public:
TString GetDurationStats() const;
};
-} \ No newline at end of file
+}