aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-01-11 13:44:53 +0300
committerGitHub <noreply@github.com>2025-01-11 13:44:53 +0300
commitc5eecd55a3238f47724a7587962efa01b8f29325 (patch)
tree51d4482a26962e7f61b10bd6a819b871bc67a7e8
parent2b30a702fc287a4e9b738cd7aaea27fb1b4a5d4c (diff)
downloadydb-c5eecd55a3238f47724a7587962efa01b8f29325.tar.gz
scanner simple speed up (#13281)
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp45
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h2
-rw-r--r--ydb/library/accessor/positive_integer.cpp8
-rw-r--r--ydb/library/accessor/positive_integer.h8
4 files changed, 53 insertions, 10 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
index bc4e34df7f1..95de1b6a12a 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
@@ -18,9 +18,6 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
Context->GetCommonContext()->GetCounters().OnSourceFinished(
source->GetRecordsCount(), source->GetUsedRawBytes(), tableExt ? tableExt->num_rows() : 0);
- if ((!tableExt || !tableExt->num_rows()) && Context->GetCommonContext()->GetReadMetadata()->HasLimit() && InFlightLimit < MaxInFlight) {
- InFlightLimit = 2 * InFlightLimit;
- }
source->MutableStageResult().SetResultChunk(std::move(tableExt), startIndex, recordsCount);
while (FetchingSources.size()) {
auto frontSource = FetchingSources.front();
@@ -59,13 +56,18 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
FetchingSources.pop_front();
frontSource->ClearResult();
- if (Context->GetCommonContext()->GetReadMetadata()->HasLimit() && FetchingSources.size() && frontSource->GetResultRecordsCount()) {
- FinishedSources.emplace(frontSource);
- while (FinishedSources.size() && (*FinishedSources.begin())->GetFinish() < FetchingSources.front()->GetStart()) {
- auto fetchingSource = FetchingSources.front();
+ if (Context->GetCommonContext()->GetReadMetadata()->HasLimit() && SortedSources.size() && frontSource->GetResultRecordsCount()) {
+ AFL_VERIFY(FetchingInFlightSources.erase(frontSource));
+ AFL_VERIFY(FinishedSources.emplace(frontSource).second);
+ while (FinishedSources.size() && (*FinishedSources.begin())->GetFinish() < SortedSources.front()->GetStart()) {
auto finishedSource = *FinishedSources.begin();
+ if (!finishedSource->GetResultRecordsCount() && Context->GetCommonContext()->GetReadMetadata()->HasLimit() &&
+ InFlightLimit < MaxInFlight) {
+ InFlightLimit = 2 * InFlightLimit;
+ }
FetchedCount += finishedSource->GetResultRecordsCount();
FinishedSources.erase(FinishedSources.begin());
+ --IntervalsInFlightCount;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource->GetSourceId())(
"source_idx", finishedSource->GetSourceIdx())("limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())(
"fetched", finishedSource->GetResultRecordsCount());
@@ -119,14 +121,41 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
if (!Context->IsActive()) {
return false;
}
+ if (InFlightLimit <= IntervalsInFlightCount) {
+ return false;
+ }
+ if (SortedSources.size() == 0) {
+ return false;
+ }
bool changed = false;
- while (SortedSources.size() && FetchingSources.size() < InFlightLimit) {
+ ui32 inFlightCountLocal = 0;
+ if (SortedSources.size()) {
+ for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
+ if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
+ ++inFlightCountLocal;
+ }
+ }
+ }
+ AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal);
+ while (SortedSources.size() && inFlightCountLocal < InFlightLimit) {
SortedSources.front()->StartProcessing(SortedSources.front());
FetchingSources.emplace_back(SortedSources.front());
FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front());
+ AFL_VERIFY(FetchingInFlightSources.emplace(SortedSources.front()).second);
SortedSources.pop_front();
+ if (SortedSources.size()) {
+ ui32 inFlightCountLocalNew = 0;
+ for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
+ if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
+ ++inFlightCountLocalNew;
+ }
+ }
+ AFL_VERIFY(inFlightCountLocal <= inFlightCountLocalNew);
+ inFlightCountLocal = inFlightCountLocalNew;
+ }
changed = true;
}
+ IntervalsInFlightCount = inFlightCountLocal;
return changed;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h
index c60e2d436ee..08df0906f2a 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h
@@ -29,6 +29,8 @@ private:
std::deque<std::shared_ptr<IDataSource>> SortedSources;
std::deque<std::shared_ptr<IDataSource>> FetchingSources;
std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareFinishForScanSequence> FinishedSources;
+ std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareFinishForScanSequence> FetchingInFlightSources;
+ TPositiveControlInteger IntervalsInFlightCount;
ui64 FetchedCount = 0;
ui64 InFlightLimit = 1;
ui64 MaxInFlight = 256;
diff --git a/ydb/library/accessor/positive_integer.cpp b/ydb/library/accessor/positive_integer.cpp
index b83845e640b..29723a1c097 100644
--- a/ydb/library/accessor/positive_integer.cpp
+++ b/ydb/library/accessor/positive_integer.cpp
@@ -35,4 +35,10 @@ ui64 TPositiveControlInteger::Val() const {
return Value;
}
-} \ No newline at end of file
+}
+
+template<>
+void Out<NKikimr::TPositiveControlInteger>(IOutputStream& o,
+ typename TTypeTraits<NKikimr::TPositiveControlInteger>::TFuncParam x) {
+ o << x.Val();
+}
diff --git a/ydb/library/accessor/positive_integer.h b/ydb/library/accessor/positive_integer.h
index 207d19b1ef2..a7cfab955ef 100644
--- a/ydb/library/accessor/positive_integer.h
+++ b/ydb/library/accessor/positive_integer.h
@@ -1,5 +1,7 @@
#pragma once
#include <util/system/types.h>
+#include <util/stream/output.h>
+#include <util/generic/typetraits.h>
namespace NKikimr {
@@ -12,6 +14,10 @@ public:
: Value(value) {
}
+ TPositiveControlInteger(const ui32 value)
+ : Value(value) {
+
+ }
TPositiveControlInteger(const i64 value);
ui64 Add(const ui64 value);
ui64 Sub(const ui64 value);
@@ -37,4 +43,4 @@ public:
}
};
-} \ No newline at end of file
+}