aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-02-21 22:49:46 +0300
committerGitHub <noreply@github.com>2025-02-21 22:49:46 +0300
commit1041280c2db74f6cbd14866c053f5472142647ad (patch)
tree577b66ebda552f29767dd52eb1a87af46d81aed9
parentef68464d3757a3ac9f4c6b015bbc29e364b7415a (diff)
downloadydb-1041280c2db74f6cbd14866c053f5472142647ad.tar.gz
correct long requests abortion (#14888)
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp1
4 files changed, 3 insertions, 8 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
index 70e76a9c91..b0e489fa74 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
@@ -125,10 +125,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s
}
TConclusion<bool> TScanHead::BuildNextInterval() {
- if (Context->IsAborted()) {
- return false;
- }
- while (BorderPoints.size()) {
+ while (BorderPoints.size() && !Context->IsAborted()) {
if (BorderPoints.begin()->second.GetStartSources().size()) {
if (FetchingIntervals.size() >= InFlightLimit) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "too many intervals in flight")(
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
index 162dc4dbc4..2a5394a76d 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
@@ -23,7 +23,6 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch
void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::shared_ptr<IDataSource>& sourcePtr) {
AFL_VERIFY(FetchingPlan);
- AFL_VERIFY(!GetContext()->IsAborted());
if (!IsReadyFlag) {
AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
index 7251fb8d33..bab940881a 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
@@ -126,7 +126,7 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
}
bool changed = false;
if (!Context->GetCommonContext()->GetReadMetadata()->HasLimit()) {
- while (SortedSources.size() && FetchingSources.size() < InFlightLimit) {
+ while (SortedSources.size() && FetchingSources.size() < InFlightLimit && Context->IsActive()) {
SortedSources.front()->StartProcessing(SortedSources.front());
FetchingSources.emplace_back(SortedSources.front());
AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second);
@@ -139,7 +139,7 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
}
ui32 inFlightCountLocal = GetInFlightIntervalsCount();
AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal);
- while (SortedSources.size() && inFlightCountLocal < InFlightLimit) {
+ while (SortedSources.size() && inFlightCountLocal < InFlightLimit && Context->IsActive()) {
SortedSources.front()->StartProcessing(SortedSources.front());
FetchingSources.emplace_back(SortedSources.front());
AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second);
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
index 5438f0a93f..124a08cadb 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
@@ -23,7 +23,6 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch
void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr) {
AFL_VERIFY(!ProcessingStarted);
AFL_VERIFY(FetchingPlan);
- AFL_VERIFY(!GetContext()->IsAborted());
ProcessingStarted = true;
SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());