diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-12-02 09:57:54 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-12-02 10:43:00 +0300 |
commit | e524bc608e30bc4fcfd01e857cb1df5d4a23b530 (patch) | |
tree | c76fc614ecb56e9d0385c496bbad6dedf203dbde | |
parent | 5bb5fa1eafe0aa2c8a8495c5e76079d7a507d697 (diff) | |
download | ydb-e524bc608e30bc4fcfd01e857cb1df5d4a23b530.tar.gz |
KIKIMR-19880: dont read all immediately if request has limit (q23 1.4s->200ms)
6 files changed, 30 insertions, 6 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 79349592d1..a2d4dc7bb8 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -43,7 +43,7 @@ void TColumnShardScanIterator::FillReadyResults() { } if (limitLeft == 0) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "abort_scan")("limit", Context->GetReadMetadata()->Limit)("ready", ItemsRead); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort_scan")("limit", Context->GetReadMetadata()->Limit)("ready", ItemsRead); IndexedData->Abort(); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 42f17fb44c..e98255ef00 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -166,12 +166,13 @@ TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPositio } void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) { - OnInitResourcesGuard(guard); + AFL_VERIFY(guard); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("interval_idx", IntervalIdx)("event", "resources_allocated") ("resources", guard->DebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size()); for (auto&& [_, i] : Sources) { i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval()), i); } + OnInitResourcesGuard(guard); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp index 3f0a818c74..b80813b1d8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp @@ -6,6 +6,14 @@ namespace NKikimr::NOlap::NPlainReader { void TScanHead::OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch>& newBatch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader) { + if (Context->GetReadMetadata()->Limit && (!newBatch || newBatch->GetRecordsCount() == 0) && InFlightLimit < 1000) { + if (++ZeroCount == std::max<ui64>(16, InFlightLimit)) { + InFlightLimit *= 2; + ZeroCount = 0; + } + } else { + ZeroCount = 0; + } auto itInterval = FetchingIntervals.find(intervalIdx); AFL_VERIFY(itInterval != FetchingIntervals.end()); if (!Context->GetCommonContext()->GetReadMetadata()->IsSorted()) { @@ -47,6 +55,7 @@ void TScanHead::OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context) : Context(context) { + InFlightLimit = Context->GetReadMetadata()->Limit ? 1 : Max<ui32>(); while (sources.size()) { auto source = sources.front(); BorderPoints[source->GetStart()].AddStart(source); @@ -69,7 +78,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s } bool TScanHead::BuildNextInterval() { - while (BorderPoints.size()) { + while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) { auto firstBorderPointInfo = std::move(BorderPoints.begin()->second); bool includeStart = firstBorderPointInfo.GetStartSources().size(); for (auto&& i : firstBorderPointInfo.GetStartSources()) { @@ -131,7 +140,7 @@ void TScanHead::Abort() { i.second->Abort(); } FetchingIntervals.clear(); - AFL_VERIFY(BorderPoints.empty()); + BorderPoints.clear(); Y_ABORT_UNLESS(IsFinished()); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h index dad732928e..971cbc654b 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h @@ -44,7 +44,8 @@ private: ui32 SegmentIdxCounter = 0; std::vector<TIntervalStat> IntervalStats; void DrainSources(); - + ui64 InFlightLimit = 1; + ui64 ZeroCount = 0; public: TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 767e826282..254c6205b1 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -29,6 +29,9 @@ void IDataSource::InitFetchStageData(const std::shared_ptr<arrow::RecordBatch>& void IDataSource::InitFilterStageData(const std::shared_ptr<NArrow::TColumnFilter>& appliedFilter, const std::shared_ptr<NArrow::TColumnFilter>& earlyFilter, const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<IDataSource>& sourcePtr) { + if (IsAborted()) { + return; + } NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFilterStageData")); Y_ABORT_UNLESS(!FilterStageData); FilterStageData = std::make_shared<TFilterStageData>(appliedFilter, earlyFilter, batch); @@ -44,12 +47,18 @@ void IDataSource::InitFetchingPlan(const TFetchingPlan& fetchingPlan, const std: Y_ABORT_UNLESS(!FetchingPlan); FetchingPlan = fetchingPlan; NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); + if (IsAborted()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "InitFetchingPlanAborted"); + return; + } DoStartFilterStage(sourcePtr); } } void IDataSource::RegisterInterval(TFetchingInterval& interval) { - AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second); + if (!FetchStageData) { + AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second); + } } void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index ddb3ddc6a9..693a4353bc 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -39,6 +39,10 @@ protected: TAtomic FilterStageFlag = 0; + bool IsAborted() const { + return AbortedFlag; + } + virtual void DoStartFilterStage(const std::shared_ptr<IDataSource>& sourcePtr) = 0; virtual void DoStartFetchStage(const std::shared_ptr<IDataSource>& sourcePtr) = 0; virtual void DoAbort() = 0; |