aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-12-02 09:57:54 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-12-02 10:43:00 +0300
commite524bc608e30bc4fcfd01e857cb1df5d4a23b530 (patch)
treec76fc614ecb56e9d0385c496bbad6dedf203dbde
parent5bb5fa1eafe0aa2c8a8495c5e76079d7a507d697 (diff)
downloadydb-e524bc608e30bc4fcfd01e857cb1df5d4a23b530.tar.gz
KIKIMR-19880: dont read all immediately if request has limit (q23 1.4s->200ms)
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.h4
6 files changed, 30 insertions, 6 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 79349592d1..a2d4dc7bb8 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -43,7 +43,7 @@ void TColumnShardScanIterator::FillReadyResults() {
}
if (limitLeft == 0) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "abort_scan")("limit", Context->GetReadMetadata()->Limit)("ready", ItemsRead);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort_scan")("limit", Context->GetReadMetadata()->Limit)("ready", ItemsRead);
IndexedData->Abort();
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index 42f17fb44c..e98255ef00 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -166,12 +166,13 @@ TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPositio
}
void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
- OnInitResourcesGuard(guard);
+ AFL_VERIFY(guard);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("interval_idx", IntervalIdx)("event", "resources_allocated")
("resources", guard->DebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size());
for (auto&& [_, i] : Sources) {
i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval()), i);
}
+ OnInitResourcesGuard(guard);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
index 3f0a818c74..b80813b1d8 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
@@ -6,6 +6,14 @@
namespace NKikimr::NOlap::NPlainReader {
void TScanHead::OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch>& newBatch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader) {
+ if (Context->GetReadMetadata()->Limit && (!newBatch || newBatch->GetRecordsCount() == 0) && InFlightLimit < 1000) {
+ if (++ZeroCount == std::max<ui64>(16, InFlightLimit)) {
+ InFlightLimit *= 2;
+ ZeroCount = 0;
+ }
+ } else {
+ ZeroCount = 0;
+ }
auto itInterval = FetchingIntervals.find(intervalIdx);
AFL_VERIFY(itInterval != FetchingIntervals.end());
if (!Context->GetCommonContext()->GetReadMetadata()->IsSorted()) {
@@ -47,6 +55,7 @@ void TScanHead::OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch
TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context)
: Context(context)
{
+ InFlightLimit = Context->GetReadMetadata()->Limit ? 1 : Max<ui32>();
while (sources.size()) {
auto source = sources.front();
BorderPoints[source->GetStart()].AddStart(source);
@@ -69,7 +78,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s
}
bool TScanHead::BuildNextInterval() {
- while (BorderPoints.size()) {
+ while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) {
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
bool includeStart = firstBorderPointInfo.GetStartSources().size();
for (auto&& i : firstBorderPointInfo.GetStartSources()) {
@@ -131,7 +140,7 @@ void TScanHead::Abort() {
i.second->Abort();
}
FetchingIntervals.clear();
- AFL_VERIFY(BorderPoints.empty());
+ BorderPoints.clear();
Y_ABORT_UNLESS(IsFinished());
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
index dad732928e..971cbc654b 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
@@ -44,7 +44,8 @@ private:
ui32 SegmentIdxCounter = 0;
std::vector<TIntervalStat> IntervalStats;
void DrainSources();
-
+ ui64 InFlightLimit = 1;
+ ui64 ZeroCount = 0;
public:
TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index 767e826282..254c6205b1 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -29,6 +29,9 @@ void IDataSource::InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>&
void IDataSource::InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter,
const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<IDataSource>& sourcePtr) {
+ if (IsAborted()) {
+ return;
+ }
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFilterStageData"));
Y_ABORT_UNLESS(!FilterStageData);
FilterStageData = std::make_shared<TFilterStageData>(appliedFilter, earlyFilter, batch);
@@ -44,12 +47,18 @@ void IDataSource::InitFetchingPlan(const TFetchingPlan& fetchingPlan, const std:
Y_ABORT_UNLESS(!FetchingPlan);
FetchingPlan = fetchingPlan;
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
+ if (IsAborted()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "InitFetchingPlanAborted");
+ return;
+ }
DoStartFilterStage(sourcePtr);
}
}
void IDataSource::RegisterInterval(TFetchingInterval& interval) {
- AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second);
+ if (!FetchStageData) {
+ AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second);
+ }
}
void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds,
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
index ddb3ddc6a9..693a4353bc 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
@@ -39,6 +39,10 @@ protected:
TAtomic FilterStageFlag = 0;
+ bool IsAborted() const {
+ return AbortedFlag;
+ }
+
virtual void DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) = 0;
virtual void DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) = 0;
virtual void DoAbort() = 0;