diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-01-11 13:44:53 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-01-11 13:44:53 +0300 |
commit | c5eecd55a3238f47724a7587962efa01b8f29325 (patch) | |
tree | 51d4482a26962e7f61b10bd6a819b871bc67a7e8 | |
parent | 2b30a702fc287a4e9b738cd7aaea27fb1b4a5d4c (diff) | |
download | ydb-c5eecd55a3238f47724a7587962efa01b8f29325.tar.gz |
scanner simple speed up (#13281)
4 files changed, 53 insertions, 10 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp index bc4e34df7f1..95de1b6a12a 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp @@ -18,9 +18,6 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s Context->GetCommonContext()->GetCounters().OnSourceFinished( source->GetRecordsCount(), source->GetUsedRawBytes(), tableExt ? tableExt->num_rows() : 0); - if ((!tableExt || !tableExt->num_rows()) && Context->GetCommonContext()->GetReadMetadata()->HasLimit() && InFlightLimit < MaxInFlight) { - InFlightLimit = 2 * InFlightLimit; - } source->MutableStageResult().SetResultChunk(std::move(tableExt), startIndex, recordsCount); while (FetchingSources.size()) { auto frontSource = FetchingSources.front(); @@ -59,13 +56,18 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx())); FetchingSources.pop_front(); frontSource->ClearResult(); - if (Context->GetCommonContext()->GetReadMetadata()->HasLimit() && FetchingSources.size() && frontSource->GetResultRecordsCount()) { - FinishedSources.emplace(frontSource); - while (FinishedSources.size() && (*FinishedSources.begin())->GetFinish() < FetchingSources.front()->GetStart()) { - auto fetchingSource = FetchingSources.front(); + if (Context->GetCommonContext()->GetReadMetadata()->HasLimit() && SortedSources.size() && frontSource->GetResultRecordsCount()) { + AFL_VERIFY(FetchingInFlightSources.erase(frontSource)); + AFL_VERIFY(FinishedSources.emplace(frontSource).second); + while (FinishedSources.size() && (*FinishedSources.begin())->GetFinish() < SortedSources.front()->GetStart()) { auto finishedSource = *FinishedSources.begin(); + if (!finishedSource->GetResultRecordsCount() && Context->GetCommonContext()->GetReadMetadata()->HasLimit() && + InFlightLimit < MaxInFlight) { + InFlightLimit = 2 * InFlightLimit; + } FetchedCount += finishedSource->GetResultRecordsCount(); FinishedSources.erase(FinishedSources.begin()); + --IntervalsInFlightCount; AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource->GetSourceId())( "source_idx", finishedSource->GetSourceIdx())("limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())( "fetched", finishedSource->GetResultRecordsCount()); @@ -119,14 +121,41 @@ TConclusion<bool> TScanHead::BuildNextInterval() { if (!Context->IsActive()) { return false; } + if (InFlightLimit <= IntervalsInFlightCount) { + return false; + } + if (SortedSources.size() == 0) { + return false; + } bool changed = false; - while (SortedSources.size() && FetchingSources.size() < InFlightLimit) { + ui32 inFlightCountLocal = 0; + if (SortedSources.size()) { + for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) { + if ((*it)->GetFinish() < SortedSources.front()->GetStart()) { + ++inFlightCountLocal; + } + } + } + AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal); + while (SortedSources.size() && inFlightCountLocal < InFlightLimit) { SortedSources.front()->StartProcessing(SortedSources.front()); FetchingSources.emplace_back(SortedSources.front()); FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()); + AFL_VERIFY(FetchingInFlightSources.emplace(SortedSources.front()).second); SortedSources.pop_front(); + if (SortedSources.size()) { + ui32 inFlightCountLocalNew = 0; + for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) { + if ((*it)->GetFinish() < SortedSources.front()->GetStart()) { + ++inFlightCountLocalNew; + } + } + AFL_VERIFY(inFlightCountLocal <= inFlightCountLocalNew); + inFlightCountLocal = inFlightCountLocalNew; + } changed = true; } + IntervalsInFlightCount = inFlightCountLocal; return changed; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h index c60e2d436ee..08df0906f2a 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h @@ -29,6 +29,8 @@ private: std::deque<std::shared_ptr<IDataSource>> SortedSources; std::deque<std::shared_ptr<IDataSource>> FetchingSources; std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareFinishForScanSequence> FinishedSources; + std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareFinishForScanSequence> FetchingInFlightSources; + TPositiveControlInteger IntervalsInFlightCount; ui64 FetchedCount = 0; ui64 InFlightLimit = 1; ui64 MaxInFlight = 256; diff --git a/ydb/library/accessor/positive_integer.cpp b/ydb/library/accessor/positive_integer.cpp index b83845e640b..29723a1c097 100644 --- a/ydb/library/accessor/positive_integer.cpp +++ b/ydb/library/accessor/positive_integer.cpp @@ -35,4 +35,10 @@ ui64 TPositiveControlInteger::Val() const { return Value; } -}
\ No newline at end of file +} + +template<> +void Out<NKikimr::TPositiveControlInteger>(IOutputStream& o, + typename TTypeTraits<NKikimr::TPositiveControlInteger>::TFuncParam x) { + o << x.Val(); +} diff --git a/ydb/library/accessor/positive_integer.h b/ydb/library/accessor/positive_integer.h index 207d19b1ef2..a7cfab955ef 100644 --- a/ydb/library/accessor/positive_integer.h +++ b/ydb/library/accessor/positive_integer.h @@ -1,5 +1,7 @@ #pragma once #include <util/system/types.h> +#include <util/stream/output.h> +#include <util/generic/typetraits.h> namespace NKikimr { @@ -12,6 +14,10 @@ public: : Value(value) { } + TPositiveControlInteger(const ui32 value) + : Value(value) { + + } TPositiveControlInteger(const i64 value); ui64 Add(const ui64 value); ui64 Sub(const ui64 value); @@ -37,4 +43,4 @@ public: } }; -}
\ No newline at end of file +} |