aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-01-11 17:39:50 +0300
committerGitHub <noreply@github.com>2025-01-11 14:39:50 +0000
commit039db66470ba53cf942f3d1c57a26e4d0e7e33fe (patch)
treefcdd1c6af7dfd6497e857de889d9e393a81326dd
parentc5eecd55a3238f47724a7587962efa01b8f29325 (diff)
downloadydb-039db66470ba53cf942f3d1c57a26e4d0e7e33fe.tar.gz
fix scanner (#13292)
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp8
1 files changed, 6 insertions, 2 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
index 95de1b6a12a..ab82cacef47 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
@@ -61,8 +61,7 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
AFL_VERIFY(FinishedSources.emplace(frontSource).second);
while (FinishedSources.size() && (*FinishedSources.begin())->GetFinish() < SortedSources.front()->GetStart()) {
auto finishedSource = *FinishedSources.begin();
- if (!finishedSource->GetResultRecordsCount() && Context->GetCommonContext()->GetReadMetadata()->HasLimit() &&
- InFlightLimit < MaxInFlight) {
+ if (!finishedSource->GetResultRecordsCount() && InFlightLimit < MaxInFlight) {
InFlightLimit = 2 * InFlightLimit;
}
FetchedCount += finishedSource->GetResultRecordsCount();
@@ -75,6 +74,7 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")(
"limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())("fetched", FetchedCount);
SortedSources.clear();
+ break;
}
}
}
@@ -133,6 +133,8 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocal;
+ } else {
+ break;
}
}
}
@@ -148,6 +150,8 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocalNew;
+ } else {
+ break;
}
}
AFL_VERIFY(inFlightCountLocal <= inFlightCountLocalNew);