diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-02-21 22:49:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-21 22:49:46 +0300 |
commit | 1041280c2db74f6cbd14866c053f5472142647ad (patch) | |
tree | 577b66ebda552f29767dd52eb1a87af46d81aed9 | |
parent | ef68464d3757a3ac9f4c6b015bbc29e364b7415a (diff) | |
download | ydb-1041280c2db74f6cbd14866c053f5472142647ad.tar.gz |
correct long requests abortion (#14888)
4 files changed, 3 insertions, 8 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index 70e76a9c91..b0e489fa74 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -125,10 +125,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s } TConclusion<bool> TScanHead::BuildNextInterval() { - if (Context->IsAborted()) { - return false; - } - while (BorderPoints.size()) { + while (BorderPoints.size() && !Context->IsAborted()) { if (BorderPoints.begin()->second.GetStartSources().size()) { if (FetchingIntervals.size() >= InFlightLimit) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "too many intervals in flight")( diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index 162dc4dbc4..2a5394a76d 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -23,7 +23,6 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::shared_ptr<IDataSource>& sourcePtr) { AFL_VERIFY(FetchingPlan); - AFL_VERIFY(!GetContext()->IsAborted()); if (!IsReadyFlag) { AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp index 7251fb8d33..bab940881a 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp @@ -126,7 +126,7 @@ TConclusion<bool> TScanHead::BuildNextInterval() { } bool changed = false; if (!Context->GetCommonContext()->GetReadMetadata()->HasLimit()) { - while (SortedSources.size() && FetchingSources.size() < InFlightLimit) { + while (SortedSources.size() && FetchingSources.size() < InFlightLimit && Context->IsActive()) { SortedSources.front()->StartProcessing(SortedSources.front()); FetchingSources.emplace_back(SortedSources.front()); AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second); @@ -139,7 +139,7 @@ TConclusion<bool> TScanHead::BuildNextInterval() { } ui32 inFlightCountLocal = GetInFlightIntervalsCount(); AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal); - while (SortedSources.size() && inFlightCountLocal < InFlightLimit) { + while (SortedSources.size() && inFlightCountLocal < InFlightLimit && Context->IsActive()) { SortedSources.front()->StartProcessing(SortedSources.front()); FetchingSources.emplace_back(SortedSources.front()); AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index 5438f0a93f..124a08cadb 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -23,7 +23,6 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr) { AFL_VERIFY(!ProcessingStarted); AFL_VERIFY(FetchingPlan); - AFL_VERIFY(!GetContext()->IsAborted()); ProcessingStarted = true; SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId()); |