diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-04-11 18:28:42 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-11 18:28:42 +0300 |
commit | 381d45aaf77a08c42b2f00fd7580dc497ab04d7f (patch) | |
tree | 12055e69f2f2acce5e38dd274ce56c24de7d8df5 | |
parent | 377a863d791f96389c1e8bdc20861e56302c0d90 (diff) | |
download | ydb-381d45aaf77a08c42b2f00fd7580dc497ab04d7f.tar.gz |
search engine with sequential syn_points for limit control (#16896)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
56 files changed, 1253 insertions, 592 deletions
diff --git a/ydb/core/formats/arrow/accessor/abstract/accessor.cpp b/ydb/core/formats/arrow/accessor/abstract/accessor.cpp index 82e7bafd87..152bae082d 100644 --- a/ydb/core/formats/arrow/accessor/abstract/accessor.cpp +++ b/ydb/core/formats/arrow/accessor/abstract/accessor.cpp @@ -131,12 +131,12 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArray(const TColum const ui32 start = context.GetStartIndex().value_or(0); const ui32 count = context.GetRecordsCount().value_or(GetRecordsCount() - start); auto slice = ISlice(start, count); - if (context.GetFilter()) { - return ApplyFilter(*context.GetFilter(), nullptr)->GetChunkedArrayTrivial(); + if (context.GetFilter() && !context.GetFilter()->IsTotalAllowFilter()) { + return slice->ApplyFilter(context.GetFilter()->Slice(start, count), slice)->GetChunkedArrayTrivial(); } else { return slice->GetChunkedArrayTrivial(); } - } else if (context.GetFilter()) { + } else if (context.GetFilter() && !context.GetFilter()->IsTotalAllowFilter()) { return ApplyFilter(*context.GetFilter(), nullptr)->GetChunkedArrayTrivial(); } else { return GetChunkedArrayTrivial(); diff --git a/ydb/core/formats/arrow/accessor/abstract/common.cpp b/ydb/core/formats/arrow/accessor/abstract/common.cpp index 7b27b68985..04fc014e1e 100644 --- a/ydb/core/formats/arrow/accessor/abstract/common.cpp +++ b/ydb/core/formats/arrow/accessor/abstract/common.cpp @@ -15,20 +15,12 @@ TColumnConstructionContext& TColumnConstructionContext::SetFilter(const std::sha std::optional<TColumnConstructionContext> TColumnConstructionContext::Slice(const ui32 offset, const ui32 count) const { std::optional<TColumnConstructionContext> result; - if (StartIndex && RecordsCount) { - const ui32 start = std::max<ui32>(offset, *StartIndex); - const ui32 finish = std::min<ui32>(offset + count, *StartIndex + *RecordsCount); - if (finish <= start) { - result = std::nullopt; - } else { - result = TColumnConstructionContext().SetStartIndex(start - offset).SetRecordsCount(finish - start, count); - } - } else if (StartIndex && !RecordsCount) { - result = TColumnConstructionContext().SetStartIndex(std::max<ui32>(offset, *StartIndex) - offset); - } else if (!StartIndex && RecordsCount) { - result = TColumnConstructionContext().SetRecordsCount(std::min<ui32>(count, *RecordsCount), count); + const ui32 start = std::max<ui32>(offset, StartIndex.value_or(0)); + const ui32 finish = std::min<ui32>(offset + count, StartIndex.value_or(0) + RecordsCount.value_or(offset + count)); + if (finish <= start) { + result = std::nullopt; } else { - result = TColumnConstructionContext(); + result = TColumnConstructionContext().SetStartIndex(start - offset).SetRecordsCount(finish - start, count); } if (result && Filter) { result->SetFilter(std::make_shared<TColumnFilter>(Filter->Slice(offset, count))); diff --git a/ydb/core/formats/arrow/accessor/composite/accessor.cpp b/ydb/core/formats/arrow/accessor/composite/accessor.cpp index e5f4452549..eef73d9970 100644 --- a/ydb/core/formats/arrow/accessor/composite/accessor.cpp +++ b/ydb/core/formats/arrow/accessor/composite/accessor.cpp @@ -119,6 +119,7 @@ std::shared_ptr<arrow::ChunkedArray> TCompositeChunkedArray::GetChunkedArray(con if (chunks.size()) { break; } else { + pos += i->GetRecordsCount(); continue; } } diff --git a/ydb/core/formats/arrow/program/collection.h b/ydb/core/formats/arrow/program/collection.h index 3941ac1abc..5d1f6a5b29 100644 --- a/ydb/core/formats/arrow/program/collection.h +++ b/ydb/core/formats/arrow/program/collection.h @@ -59,12 +59,19 @@ public: AFL_VERIFY(Markers.erase(marker)); } - bool IsEmptyFiltered() const { + bool IsEmptyFilter() const { return Filter->IsTotalDenyFilter(); } - bool HasAccessors() const { - return Accessors.size(); + bool HasData() const { + return Accessors.size() || !!RecordsCountActual; + } + + bool HasDataAndResultIsEmpty() const { + if (!HasData()) { + return false; + } + return !GetRecordsCountActualVerified() || IsEmptyFilter(); } std::optional<ui32> GetRecordsCountActualOptional() const { diff --git a/ydb/core/formats/arrow/program/graph_execute.cpp b/ydb/core/formats/arrow/program/graph_execute.cpp index 0bd1f099f1..a8e2f3f5c6 100644 --- a/ydb/core/formats/arrow/program/graph_execute.cpp +++ b/ydb/core/formats/arrow/program/graph_execute.cpp @@ -168,7 +168,7 @@ TConclusionStatus TCompiledGraph::Apply( AFL_VERIFY(*conclusion != IResourceProcessor::EExecutionResult::InBackground); } } - if (resources->IsEmptyFiltered()) { + if (resources->HasDataAndResultIsEmpty()) { resources->Clear(); return TConclusionStatus::Success(); } diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp index 81e576d9fa..5476975998 100644 --- a/ydb/core/formats/arrow/reader/position.cpp +++ b/ydb/core/formats/arrow/reader/position.cpp @@ -125,6 +125,12 @@ TSortableScanData::TSortableScanData( BuildPosition(position); } +TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr<TGeneralContainer>& batch) { + Fields = batch->GetSchema()->GetFields(); + Columns = batch->GetColumns(); + BuildPosition(position); +} + TSortableScanData::TSortableScanData( const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns) { for (auto&& i : columns) { diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index 4f5278029d..a902744fe6 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -73,11 +73,47 @@ private: return StartPosition <= position && position < FinishPosition; } + std::partial_ordering CompareImpl(const ui64 position, const TSortableScanData& item, const ui64 itemPosition, const ui32 size) const { + AFL_VERIFY(size <= PositionAddress.size() && size <= item.PositionAddress.size()); + AFL_VERIFY(size); + if (Contains(position) && item.Contains(itemPosition)) { + for (ui32 idx = 0; idx < size; ++idx) { + std::partial_ordering cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition); + if (cmp != std::partial_ordering::equivalent) { + return cmp; + } + } + } else { + for (ui32 idx = 0; idx < size; ++idx) { + std::partial_ordering cmp = std::partial_ordering::equivalent; + const bool containsSelf = PositionAddress[idx].GetAddress().Contains(position); + const bool containsItem = item.PositionAddress[idx].GetAddress().Contains(itemPosition); + if (containsSelf && containsItem) { + cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition); + } else if (containsSelf) { + auto temporaryAddress = item.Columns[idx]->GetChunk(item.PositionAddress[idx].GetAddress(), itemPosition); + cmp = PositionAddress[idx].Compare(position, temporaryAddress, itemPosition); + } else if (containsItem) { + auto temporaryAddress = Columns[idx]->GetChunk(PositionAddress[idx].GetAddress(), position); + cmp = temporaryAddress.Compare(position, item.PositionAddress[idx], itemPosition); + } else { + AFL_VERIFY(false); + } + if (cmp != std::partial_ordering::equivalent) { + return cmp; + } + } + } + + return std::partial_ordering::equivalent; + } + public: TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch); TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns); TSortableScanData(const ui64 position, const std::shared_ptr<arrow::Table>& batch, const std::vector<std::string>& columns); TSortableScanData(const ui64 position, const std::shared_ptr<TGeneralContainer>& batch, const std::vector<std::string>& columns); + TSortableScanData(const ui64 position, const std::shared_ptr<TGeneralContainer>& batch); TSortableScanData(const ui64 position, const ui64 recordsCount, const std::vector<std::shared_ptr<NAccessor::IChunkedArray>>& columns, const std::vector<std::shared_ptr<arrow::Field>>& fields) : RecordsCount(recordsCount) @@ -117,36 +153,11 @@ public: std::partial_ordering Compare(const ui64 position, const TSortableScanData& item, const ui64 itemPosition) const { AFL_VERIFY(PositionAddress.size() == item.PositionAddress.size()); - if (Contains(position) && item.Contains(itemPosition)) { - for (ui32 idx = 0; idx < PositionAddress.size(); ++idx) { - std::partial_ordering cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition); - if (cmp != std::partial_ordering::equivalent) { - return cmp; - } - } - } else { - for (ui32 idx = 0; idx < PositionAddress.size(); ++idx) { - std::partial_ordering cmp = std::partial_ordering::equivalent; - const bool containsSelf = PositionAddress[idx].GetAddress().Contains(position); - const bool containsItem = item.PositionAddress[idx].GetAddress().Contains(itemPosition); - if (containsSelf && containsItem) { - cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition); - } else if (containsSelf) { - auto temporaryAddress = item.Columns[idx]->GetChunk(item.PositionAddress[idx].GetAddress(), itemPosition); - cmp = PositionAddress[idx].Compare(position, temporaryAddress, itemPosition); - } else if (containsItem) { - auto temporaryAddress = Columns[idx]->GetChunk(PositionAddress[idx].GetAddress(), position); - cmp = temporaryAddress.Compare(position, item.PositionAddress[idx], itemPosition); - } else { - AFL_VERIFY(false); - } - if (cmp != std::partial_ordering::equivalent) { - return cmp; - } - } - } + return CompareImpl(position, item, itemPosition, item.PositionAddress.size()); + } - return std::partial_ordering::equivalent; + std::partial_ordering ComparePartial(const ui64 position, const TSortableScanData& item, const ui64 itemPosition) const { + return CompareImpl(position, item, itemPosition, std::min<ui32>(PositionAddress.size(), item.PositionAddress.size())); } void AppendPositionTo(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const ui64 position, ui64* recordSize) const; @@ -414,6 +425,12 @@ public: return ApplyOptionalReverseForCompareResult(directResult); } + std::partial_ordering ComparePartial(const TSortableBatchPosition& item) const { + Y_ABORT_UNLESS(item.ReverseSort == ReverseSort); + const auto directResult = Sorting->ComparePartial(Position, *item.Sorting, item.GetPosition()); + return ApplyOptionalReverseForCompareResult(directResult); + } + std::partial_ordering Compare(const TSortableScanData& data, const ui64 dataPosition) const { return Sorting->Compare(Position, data, dataPosition); } diff --git a/ydb/core/formats/arrow/ut/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp index 951cc57bfb..7a6c52b5c6 100644 --- a/ydb/core/formats/arrow/ut/ut_program_step.cpp +++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp @@ -149,7 +149,13 @@ struct TSumData { } static void CheckResult(ETest test, const std::shared_ptr<TAccessorsCollection>& batch, ui32 numKeys, bool nullable) { - AFL_VERIFY(batch->GetColumnsCount() == numKeys + 2); + if (test == ETest::EMPTY) { + UNIT_ASSERT(!batch->HasData()); + return; + } else { + AFL_VERIFY(batch->GetColumnsCount() == numKeys + 2); + } + auto aggXOriginal = batch->GetArrayVerified(3); auto aggYOriginal = batch->GetArrayVerified(4); auto colXOriginal = batch->GetArrayVerified(1); diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.h b/ydb/core/tx/columnshard/engines/predicate/filter.h index 8de06ab4cf..d3074fb2bf 100644 --- a/ydb/core/tx/columnshard/engines/predicate/filter.h +++ b/ydb/core/tx/columnshard/engines/predicate/filter.h @@ -168,7 +168,6 @@ private: } virtual const std::shared_ptr<arrow::RecordBatch>& DoGetPKCursor() const override { - AFL_VERIFY(!!PrimaryKey); return PrimaryKey; } @@ -242,7 +241,7 @@ private: if (SourceId != entity.GetEntityId()) { return false; } - AFL_VERIFY(RecordIndex <= entity.GetEntityRecordsCount()); + AFL_VERIFY(RecordIndex <= entity.GetEntityRecordsCount())("index", RecordIndex)("count", entity.GetEntityRecordsCount()); usage = RecordIndex < entity.GetEntityRecordsCount(); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h b/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h index 37ba57b899..6968fd1be7 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h @@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NReader { class TScanIteratorBase { protected: - virtual void DoOnSentDataFromInterval(const ui32 /*intervalIdx*/) const { + virtual void DoOnSentDataFromInterval(const TPartialSourceAddress& /*intervalAddress*/) { } public: @@ -21,9 +21,9 @@ public: virtual const TReadStats& GetStats() const; - void OnSentDataFromInterval(const std::optional<ui32> intervalIdx) const { - if (intervalIdx) { - DoOnSentDataFromInterval(*intervalIdx); + void OnSentDataFromInterval(const std::optional<TPartialSourceAddress>& intervalAddress) { + if (intervalAddress) { + DoOnSentDataFromInterval(*intervalAddress); } } diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index 22bb1ce139..c9a63b0bf5 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -13,6 +13,7 @@ namespace NKikimr::NOlap::NReader { class TPartialReadResult; +class TPartialSourceAddress; class TComputeShardingPolicy { private: @@ -170,7 +171,7 @@ public: Started = true; return DoStart(); } - virtual void OnSentDataFromInterval(const ui32 intervalIdx) const = 0; + virtual void OnSentDataFromInterval(const TPartialSourceAddress& address) = 0; const TReadContext& GetContext() const { return *Context; diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index c424c84b9a..f847b6d177 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -265,7 +265,7 @@ bool TColumnShardScan::ProduceResults() noexcept { }
Result->LastCursorProto = CurrentLastReadKey->SerializeToProto();
SendResult(false, false);
- ScanIterator->OnSentDataFromInterval(result.GetNotFinishedIntervalIdx());
+ ScanIterator->OnSentDataFromInterval(result.GetNotFinishedInterval());
ACFL_DEBUG("stage", "finished")("iterator", ScanIterator->DebugString());
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.cpp b/ydb/core/tx/columnshard/engines/reader/common/result.cpp index 92f55f3dfc..38805a9982 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/result.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common/result.cpp @@ -55,12 +55,12 @@ std::vector<std::shared_ptr<TPartialReadResult>> TPartialReadResult::SplitResult TPartialReadResult::TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards, const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context, - const std::optional<ui32> notFinishedIntervalIdx) + const std::optional<TPartialSourceAddress> notFinishedInterval) : ResourceGuards(resourceGuards) , GroupGuard(gGuard) , ResultBatch(batch) , ScanCursor(scanCursor) - , NotFinishedIntervalIdx(notFinishedIntervalIdx) + , NotFinishedInterval(notFinishedInterval) , Guard(TValidator::CheckNotNull(context)->GetCounters().GetResultsForReplyGuard()) { Y_ABORT_UNLESS(ResultBatch.GetRecordsCount()); Y_ABORT_UNLESS(ScanCursor); diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.h b/ydb/core/tx/columnshard/engines/reader/common/result.h index f4e7d7d4b1..6fff2c8608 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/result.h +++ b/ydb/core/tx/columnshard/engines/reader/common/result.h @@ -12,6 +12,22 @@ namespace NKikimr::NOlap::NReader { class TReadContext; +class TPartialSourceAddress { +private: + YDB_READONLY(ui32, SourceId, 0); + YDB_READONLY(ui32, SourceIdx, 0); + YDB_READONLY(ui32, SyncPointIndex, 0); + +public: + TPartialSourceAddress(const ui32 sourceId, const ui32 sourceIdx, const ui32 syncPointIndex) + : SourceId(sourceId) + , SourceIdx(sourceIdx) + , SyncPointIndex(syncPointIndex) + { + + } +}; + // Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation class TPartialReadResult: public TNonCopyable { private: @@ -22,7 +38,7 @@ private: // This 1-row batch contains the last key that was read while producing the ResultBatch. // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit std::shared_ptr<IScanCursor> ScanCursor; - YDB_READONLY_DEF(std::optional<ui32>, NotFinishedIntervalIdx); + YDB_READONLY_DEF(std::optional<TPartialSourceAddress>, NotFinishedInterval); const NColumnShard::TCounterGuard Guard; public: @@ -61,11 +77,11 @@ public: explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards, const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context, - const std::optional<ui32> notFinishedIntervalIdx); + const std::optional<TPartialSourceAddress> notFinishedInterval); explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor, - const std::shared_ptr<TReadContext>& context, const std::optional<ui32> notFinishedIntervalIdx) - : TPartialReadResult({}, nullptr, batch, scanCursor, context, notFinishedIntervalIdx) { + const std::shared_ptr<TReadContext>& context, const std::optional<TPartialSourceAddress> notFinishedInterval) + : TPartialReadResult({}, nullptr, batch, scanCursor, context, notFinishedInterval) { } }; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp index 800ef6841e..6d40278016 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp @@ -10,7 +10,7 @@ void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSu FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, Source->AddEvent("fbf")); Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData())); AFL_VERIFY(Step.Next()); - auto task = std::make_shared<TStepAction>(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId(), false); NConveyor::TScanServiceOperator::SendTaskToExecute(task, Context->GetCommonContext()->GetConveyorProcessId()); } @@ -54,7 +54,7 @@ void TColumnsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::N for (auto&& i : DataFetchers) { Source->MutableStageData().AddFetcher(i.second); } - auto task = std::make_shared<TStepAction>(Source, std::move(Cursor), Source->GetContext()->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(Source, std::move(Cursor), Source->GetContext()->GetCommonContext()->GetScanActorId(), false); NConveyor::TScanServiceOperator::SendTaskToExecute(task, Source->GetContext()->GetCommonContext()->GetConveyorProcessId()); } else { FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, Source->AddEvent("cf_next")); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp index 51bfe780e3..6f9219bc9f 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp @@ -55,7 +55,7 @@ bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr } Step.Next(); FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, data->AddEvent("fmalloc")); - auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId(), false); NConveyor::TScanServiceOperator::SendTaskToExecute(task, data->GetContext()->GetCommonContext()->GetConveyorProcessId()); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h index 6551b39c27..e80f376d79 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h @@ -149,8 +149,8 @@ public: } } - bool IsEmptyFiltered() const { - return Table->IsEmptyFiltered(); + bool IsEmptyWithData() const { + return Table->HasDataAndResultIsEmpty(); } void Clear() { @@ -177,12 +177,35 @@ public: } }; +class TSourceChunkToReply { +private: + YDB_READONLY(ui32, StartIndex, 0); + YDB_READONLY(ui32, RecordsCount, 0); + std::shared_ptr<arrow::Table> Table; + +public: + const std::shared_ptr<arrow::Table>& GetTable() const { + AFL_VERIFY(Table); + return Table; + } + + bool HasData() const { + return !!Table && Table->num_rows(); + } + + TSourceChunkToReply(const ui32 startIndex, const ui32 recordsCount, const std::shared_ptr<arrow::Table>& table) + : StartIndex(startIndex) + , RecordsCount(recordsCount) + , Table(table) { + } +}; + class TFetchedResult { private: YDB_READONLY_DEF(std::shared_ptr<NArrow::TGeneralContainer>, Batch); YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedFilter); std::optional<std::deque<TPortionDataAccessor::TReadPage>> PagesToResult; - std::optional<std::shared_ptr<arrow::Table>> ChunkToReply; + std::optional<TSourceChunkToReply> ChunkToReply; TFetchedResult() = default; @@ -225,7 +248,7 @@ public: AFL_VERIFY(page.GetIndexStart() == indexStart)("real", page.GetIndexStart())("expected", indexStart); AFL_VERIFY(page.GetRecordsCount() == recordsCount)("real", page.GetRecordsCount())("expected", recordsCount); AFL_VERIFY(!ChunkToReply); - ChunkToReply = std::move(table); + ChunkToReply = TSourceChunkToReply(indexStart, recordsCount, std::move(table)); } bool IsFinished() const { @@ -236,7 +259,7 @@ public: return !!ChunkToReply; } - std::shared_ptr<arrow::Table> ExtractResultChunk() { + std::optional<TSourceChunkToReply> ExtractResultChunk() { AFL_VERIFY(!!ChunkToReply); auto result = std::move(*ChunkToReply); ChunkToReply.reset(); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index f36ebb1a81..4319b0f944 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -15,6 +15,7 @@ namespace NKikimr::NOlap::NReader::NCommon { bool TStepAction::DoApply(IDataReader& owner) const { if (FinishedFlag) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply"); + Source->StartSyncSection(); Source->OnSourceFetchingFinishedSafe(owner, Source); } return true; @@ -35,11 +36,17 @@ TConclusionStatus TStepAction::DoExecuteImpl() { return TConclusionStatus::Success(); } -TStepAction::TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId) +TStepAction::TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId, + const bool changeSyncSection) : TBase(ownerActorId) , Source(source) , Cursor(std::move(cursor)) , CountersGuard(Source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) { + if (changeSyncSection) { + Source->StartAsyncSection(); + } else { + Source->CheckAsyncSection(); + } } TConclusion<bool> TFetchingScriptCursor::Execute(const std::shared_ptr<IDataSource>& source) { @@ -48,14 +55,16 @@ TConclusion<bool> TFetchingScriptCursor::Execute(const std::shared_ptr<IDataSour Script->OnExecute(); AFL_VERIFY(!Script->IsFinished(CurrentStepIdx)); while (!Script->IsFinished(CurrentStepIdx)) { - if (source->HasStageData() && source->GetStageData().IsEmptyFiltered()) { + if (source->HasStageData() && source->GetStageData().IsEmptyWithData()) { source->OnEmptyStageData(source); break; + } else if (source->HasStageResult() && source->GetStageResult().IsEmpty()) { + break; } auto step = Script->GetStep(CurrentStepIdx); TMemoryProfileGuard mGuard("SCAN_PROFILE::FETCHING::" + step->GetName() + "::" + Script->GetBranchName(), IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx)("source_id", source->GetSourceId()); const TMonotonic startInstant = TMonotonic::Now(); const TConclusion<bool> resultStep = step->ExecuteInplace(source, *this); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index 952c3cda3a..2c5ff6b50d 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -331,10 +331,11 @@ public: } template <class T> - TStepAction(const std::shared_ptr<T>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId) - : TStepAction(std::static_pointer_cast<IDataSource>(source), std::move(cursor), ownerActorId) { + TStepAction(const std::shared_ptr<T>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId, const bool changeSyncSection) + : TStepAction(std::static_pointer_cast<IDataSource>(source), std::move(cursor), ownerActorId, changeSyncSection) { } - TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId); + TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId, + const bool changeSyncSection); }; class TProgramStep: public IFetchingStep { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp index 609773c897..21a7e016ba 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp @@ -24,8 +24,8 @@ TConclusion<bool> TColumnShardScanIterator::ReadNextInterval() { return IndexedData->ReadNextInterval(); } -void TColumnShardScanIterator::DoOnSentDataFromInterval(const ui32 intervalIdx) const { - return IndexedData->OnSentDataFromInterval(intervalIdx); +void TColumnShardScanIterator::DoOnSentDataFromInterval(const TPartialSourceAddress& address) { + return IndexedData->OnSentDataFromInterval(address); } TColumnShardScanIterator::~TColumnShardScanIterator() { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h index cd570090aa..f525af31b9 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h @@ -55,7 +55,7 @@ public: class TColumnShardScanIterator: public TScanIteratorBase { private: - virtual void DoOnSentDataFromInterval(const ui32 intervalIdx) const override; + virtual void DoOnSentDataFromInterval(const TPartialSourceAddress& address) override; protected: ui64 ItemsRead = 0; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h index c8c62b09af..afdc0b4e36 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h @@ -116,6 +116,7 @@ public: class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource { private: + TAtomic SyncSectionFlag = 1; YDB_READONLY(ui64, SourceId, 0); YDB_READONLY(ui32, SourceIdx, 0); YDB_READONLY(TSnapshot, RecordSnapshotMin, TSnapshot::Zero()); @@ -161,6 +162,22 @@ protected: std::unique_ptr<TFetchedResult> StageResult; public: + void StartAsyncSection() { + AFL_VERIFY(AtomicCas(&SyncSectionFlag, 0, 1)); + } + + void CheckAsyncSection() { + AFL_VERIFY(AtomicGet(SyncSectionFlag) == 0); + } + + void StartSyncSection() { + AFL_VERIFY(AtomicCas(&SyncSectionFlag, 1, 0)); + } + + bool IsSyncSection() const { + return AtomicGet(SyncSectionFlag) == 1; + } + void AddEvent(const TString& evDescription) { AFL_VERIFY(!!Events); Events->AddEvent(evDescription); @@ -274,6 +291,10 @@ public: return DoStartFetchingColumns(sourcePtr, step, columns); } + void ResetSourceFinishedFlag() { + AFL_VERIFY(AtomicCas(&SourceFinishedSafeFlag, 0, 1)); + } + void OnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr<IDataSource>& sourcePtr) { AFL_VERIFY(AtomicCas(&SourceFinishedSafeFlag, 1, 0)); AFL_VERIFY(sourcePtr); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index 8636058113..bb355defb1 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -87,7 +87,7 @@ TConclusion<bool> TDetectInMem::DoExecuteInplace(const std::shared_ptr<IDataSour auto plan = source->GetContext()->GetColumnsFetchingPlan(source); source->InitFetchingPlan(plan); TFetchingScriptCursor cursor(plan, 0); - auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId(), false); NConveyor::TScanServiceOperator::SendTaskToExecute(task); return false; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h index 960f49541b..2b2f5e250f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h @@ -45,8 +45,8 @@ protected: } public: - virtual void OnSentDataFromInterval(const ui32 intervalIdx) const override { - Scanner->OnSentDataFromInterval(intervalIdx); + virtual void OnSentDataFromInterval(const TPartialSourceAddress& address) override { + Scanner->OnSentDataFromInterval(address); } template <class T> diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index b0e489fa74..076dd63d49 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -21,10 +21,10 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result_received")("interval_idx", intervalIdx)( "intervalId", itInterval->second->GetIntervalId()); if (newBatch && newBatch->GetRecordsCount()) { - std::optional<ui32> callbackIdxSubscriver; + std::optional<TPartialSourceAddress> callbackIdxSubscriver; std::shared_ptr<NGroupedMemoryManager::TGroupGuard> gGuard; if (itInterval->second->HasMerger()) { - callbackIdxSubscriver = intervalIdx; + callbackIdxSubscriver = TPartialSourceAddress(itInterval->second->GetIntervalId(), intervalIdx, 0); } else { gGuard = itInterval->second->GetGroupGuard(); } @@ -203,4 +203,13 @@ void TScanHead::Abort() { Y_ABORT_UNLESS(IsFinished()); } +void TScanHead::OnSentDataFromInterval(const TPartialSourceAddress& address) const { + if (Context->IsAborted()) { + return; + } + auto it = FetchingIntervals.find(address.GetSourceIdx()); + AFL_VERIFY(it != FetchingIntervals.end())("interval_idx", address.GetSourceIdx())("count", FetchingIntervals.size()); + it->second->OnPartSendingComplete(); +} + } // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h index 09649e7881..23888d4108 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h @@ -84,14 +84,7 @@ private: void DrainSources(); [[nodiscard]] TConclusionStatus DetectSourcesFeatureInContextIntervalScan(const THashMap<ui32, std::shared_ptr<IDataSource>>& intervalSources, const bool isExclusiveInterval) const; public: - void OnSentDataFromInterval(const ui32 intervalIdx) const { - if (Context->IsAborted()) { - return; - } - auto it = FetchingIntervals.find(intervalIdx); - AFL_VERIFY(it != FetchingIntervals.end())("interval_idx", intervalIdx)("count", FetchingIntervals.size()); - it->second->OnPartSendingComplete(); - } + void OnSentDataFromInterval(const TPartialSourceAddress& address) const; bool IsReverse() const; void Abort(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index c0300c23ae..9f74bc79c6 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -36,7 +36,7 @@ void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::share return; } TFetchingScriptCursor cursor(FetchingPlan, 0); - auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId(), true); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } } @@ -170,7 +170,7 @@ private: AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size()); Source->MutableStageData().SetPortionAccessor(std::move(result.ExtractPortionsVector().front())); AFL_VERIFY(Step.Next()); - auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId(), false); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } @@ -219,7 +219,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>& const ISnapshotSchema::TPtr batchSchema = GetContext()->GetReadMetadata()->GetIndexVersions().GetSchemaVerified(GetCommitted().GetSchemaVersion()); const ISnapshotSchema::TPtr resultSchema = GetContext()->GetReadMetadata()->GetResultSchema(); - if (!GetStageData().GetTable()->HasAccessors()) { + if (!AssembledFlag) { + AssembledFlag = true; AFL_VERIFY(GetStageData().GetBlobs().size() == 1); auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first); auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 3823b0a1cc..69cce115df 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -308,6 +308,7 @@ private: using TBase = IDataSource; TCommittedBlob CommittedBlob; bool ReadStarted = false; + bool AssembledFlag = false; virtual void DoAbort() override { } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp deleted file mode 100644 index 1fb5b3dd31..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp +++ /dev/null @@ -1,115 +0,0 @@ -#include "collections.h" - -#include <ydb/core/tx/columnshard/engines/predicate/filter.h> - -namespace NKikimr::NOlap::NReader::NSimple { - -std::shared_ptr<IDataSource> TScanWithLimitCollection::DoExtractNext() { - AFL_VERIFY(HeapSources.size()); - std::pop_heap(HeapSources.begin(), HeapSources.end()); - auto result = HeapSources.back().Construct(Context); - AFL_VERIFY(FetchingInFlightSources.emplace(TCompareKeyForScanSequence::FromFinish(result)).second); - auto predPosition = std::move(HeapSources.back()); - HeapSources.pop_back(); - FetchingInFlightCount.Inc(); - return result; -} - -void TScanWithLimitCollection::DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) { - if (!source->GetResultRecordsCount() && InFlightLimit < GetMaxInFlight()) { - InFlightLimit = 2 * InFlightLimit; - } - FetchingInFlightCount.Dec(); - AFL_VERIFY(FetchingInFlightSources.erase(TCompareKeyForScanSequence::FromFinish(source))); - while (FinishedSources.size() && (HeapSources.empty() || FinishedSources.begin()->first < HeapSources.front().GetStart())) { - auto finishedSource = FinishedSources.begin()->second; - FetchedCount += finishedSource.GetRecordsCount(); - FinishedSources.erase(FinishedSources.begin()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource.GetSourceId())( - "source_idx", finishedSource.GetSourceIdx())("limit", Limit)("fetched", finishedSource.GetRecordsCount()); - if (Limit <= FetchedCount && HeapSources.size()) { - AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount); - HeapSources.clear(); - } - } -} - -ui32 TScanWithLimitCollection::GetInFlightIntervalsCount(const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const { - AFL_VERIFY(from < to); - ui32 inFlightCountLocal = 0; - { - auto itFinishedFrom = FinishedSources.lower_bound(from); - auto itFinishedTo = FinishedSources.lower_bound(to); - for (auto&& it = itFinishedFrom; it != itFinishedTo; ++it) { - ++inFlightCountLocal; - } - } - { - auto itFetchingFrom = FetchingInFlightSources.lower_bound(from); - auto itFetchingTo = FetchingInFlightSources.lower_bound(to); - for (auto&& it = itFetchingFrom; it != itFetchingTo; ++it) { - ++inFlightCountLocal; - } - } - return inFlightCountLocal; -} - -TScanWithLimitCollection::TScanWithLimitCollection( - const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, const std::shared_ptr<IScanCursor>& cursor) - : TBase(context) - , Limit((ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust()) { - if (cursor && cursor->IsInitialized()) { - for (auto&& i : sources) { - bool usage = false; - if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(i, usage)) { - continue; - } - if (usage) { - i.SetIsStartedByCursor(); - } - break; - } - } - - HeapSources = std::move(sources); - std::make_heap(HeapSources.begin(), HeapSources.end()); -} - -void TScanWithLimitCollection::DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) { - std::vector<std::shared_ptr<arrow::ChunkedArray>> pkArrays; - for (auto&& f : Context->GetReadMetadata()->GetResultSchema()->GetIndexInfo().GetReplaceKey()->fields()) { - pkArrays.emplace_back(table->GetColumnByName(f->name())); - if (!pkArrays.back()) { - pkArrays.pop_back(); - break; - } - } - AFL_VERIFY(pkArrays.size()); - const ui32 partsCount = std::min<ui32>(10, table->num_rows()); - std::optional<i32> lastPosition; - for (ui32 i = 0; i < partsCount; ++i) { - const i32 currentPosition = (i + 1) * (table->num_rows() - 1) / partsCount; - if (lastPosition) { - AFL_VERIFY(*lastPosition < currentPosition); - } - const i64 size = lastPosition ? (currentPosition - *lastPosition) : currentPosition; - lastPosition = currentPosition; - TReplaceKeyAdapter key(NArrow::TComparablePosition(pkArrays, currentPosition), Context->GetReadMetadata()->IsDescSorted()); - TCompareKeyForScanSequence finishPos(key, source->GetSourceId()); - AFL_VERIFY(FinishedSources.emplace(finishPos, TFinishedDataSource(source, size)).second); - } -} - -ISourcesCollection::ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context) - : Context(context) { - if (HasAppData() && AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) { - MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest(); - } -} - -std::shared_ptr<NKikimr::NOlap::IScanCursor> TNotSortedCollection::DoBuildCursor( - const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const { - return std::make_shared<TNotSortedSimpleScanCursor>(source->GetSourceId(), readyRecords); -} - -} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h deleted file mode 100644 index b497c2d761..0000000000 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h +++ /dev/null @@ -1,244 +0,0 @@ -#pragma once -#include "context.h" -#include "source.h" - -#include <ydb/library/accessor/positive_integer.h> - -namespace NKikimr::NOlap::NReader::NSimple { - -class ISourcesCollection { -private: - virtual bool DoIsFinished() const = 0; - virtual std::shared_ptr<IDataSource> DoExtractNext() = 0; - virtual bool DoCheckInFlightLimits() const = 0; - virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) = 0; - virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) = 0; - virtual void DoClear() = 0; - - TPositiveControlInteger SourcesInFlightCount; - YDB_READONLY(ui64, MaxInFlight, 1024); - - virtual TString DoDebugString() const { - return ""; - } - virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const = 0; - -protected: - const std::shared_ptr<TSpecialReadContext> Context; - -public: - std::shared_ptr<IScanCursor> BuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const { - return DoBuildCursor(source, readyRecords); - } - - void OnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) { - return DoOnIntervalResult(table, source); - } - - TString DebugString() const { - return DoDebugString(); - } - - virtual ~ISourcesCollection() = default; - - std::shared_ptr<IDataSource> ExtractNext() { - SourcesInFlightCount.Inc(); - return DoExtractNext(); - } - - bool IsFinished() const { - return DoIsFinished(); - } - - void OnSourceFinished(const std::shared_ptr<IDataSource>& source) { - AFL_VERIFY(source); - SourcesInFlightCount.Dec(); - DoOnSourceFinished(source); - } - - bool CheckInFlightLimits() const { - return DoCheckInFlightLimits(); - } - - void Clear() { - DoClear(); - } - - ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context); -}; - -class TNotSortedCollection: public ISourcesCollection { -private: - using TBase = ISourcesCollection; - std::optional<ui32> Limit; - ui32 InFlightLimit = 1; - std::deque<TSourceConstructor> Sources; - TPositiveControlInteger InFlightCount; - ui32 FetchedCount = 0; - virtual void DoClear() override { - Sources.clear(); - } - virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override; - virtual bool DoIsFinished() const override { - return Sources.empty(); - } - virtual std::shared_ptr<IDataSource> DoExtractNext() override { - AFL_VERIFY(Sources.size()); - auto result = Sources.front().Construct(Context); - Sources.pop_front(); - InFlightCount.Inc(); - return result; - } - virtual bool DoCheckInFlightLimits() const override { - return InFlightCount < InFlightLimit; - } - virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& /*table*/, const std::shared_ptr<IDataSource>& /*source*/) override { - } - virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override { - if (!source->GetResultRecordsCount() && InFlightLimit * 2 < GetMaxInFlight()) { - InFlightLimit *= 2; - } - FetchedCount += source->GetResultRecordsCount(); - if (Limit && *Limit <= FetchedCount && Sources.size()) { - AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount); - Sources.clear(); - } - InFlightCount.Dec(); - } - -public: - TNotSortedCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, - const std::shared_ptr<IScanCursor>& cursor, const std::optional<ui32> limit) - : TBase(context) - , Limit(limit) { - if (Limit) { - InFlightLimit = 1; - } else { - InFlightLimit = GetMaxInFlight(); - } - if (cursor && cursor->IsInitialized()) { - while (sources.size()) { - bool usage = false; - if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(sources.front(), usage)) { - sources.pop_front(); - continue; - } - if (usage) { - sources.front().SetIsStartedByCursor(); - } - break; - } - } - Sources = std::move(sources); - } -}; - -class TSortedFullScanCollection: public ISourcesCollection { -private: - using TBase = ISourcesCollection; - std::deque<TSourceConstructor> HeapSources; - TPositiveControlInteger InFlightCount; - virtual void DoClear() override { - HeapSources.clear(); - } - virtual bool DoIsFinished() const override { - return HeapSources.empty(); - } - virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override { - return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords); - } - virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& /*table*/, const std::shared_ptr<IDataSource>& /*source*/) override { - } - virtual std::shared_ptr<IDataSource> DoExtractNext() override { - AFL_VERIFY(HeapSources.size()); - auto result = HeapSources.front().Construct(Context); - std::pop_heap(HeapSources.begin(), HeapSources.end()); - HeapSources.pop_back(); - InFlightCount.Inc(); - return result; - } - virtual bool DoCheckInFlightLimits() const override { - return InFlightCount < GetMaxInFlight(); - } - virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& /*source*/) override { - InFlightCount.Dec(); - } - -public: - TSortedFullScanCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, - const std::shared_ptr<IScanCursor>& cursor) - : TBase(context) { - if (cursor && cursor->IsInitialized()) { - for (auto&& i : sources) { - bool usage = false; - if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(i, usage)) { - continue; - } - if (usage) { - i.SetIsStartedByCursor(); - } - break; - } - } - HeapSources = std::move(sources); - std::make_heap(HeapSources.begin(), HeapSources.end()); - } -}; - -class TScanWithLimitCollection: public ISourcesCollection { -private: - using TBase = ISourcesCollection; - class TFinishedDataSource { - private: - YDB_READONLY(ui32, RecordsCount, 0); - YDB_READONLY(ui32, SourceId, 0); - YDB_READONLY(ui32, SourceIdx, 0); - - public: - TFinishedDataSource(const std::shared_ptr<IDataSource>& source) - : RecordsCount(source->GetResultRecordsCount()) - , SourceId(source->GetSourceId()) - , SourceIdx(source->GetSourceIdx()) { - } - - TFinishedDataSource(const std::shared_ptr<IDataSource>& source, const ui32 partSize) - : RecordsCount(partSize) - , SourceId(source->GetSourceId()) - , SourceIdx(source->GetSourceIdx()) { - AFL_VERIFY(partSize < source->GetResultRecordsCount()); - } - }; - - std::deque<TSourceConstructor> HeapSources; - TPositiveControlInteger FetchingInFlightCount; - TPositiveControlInteger FullIntervalsFetchingCount; - ui64 Limit = 0; - ui64 InFlightLimit = 1; - ui64 FetchedCount = 0; - std::map<TCompareKeyForScanSequence, TFinishedDataSource> FinishedSources; - std::set<TCompareKeyForScanSequence> FetchingInFlightSources; - - virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) override; - virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override { - return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords); - } - virtual void DoClear() override { - HeapSources.clear(); - } - virtual bool DoIsFinished() const override { - return HeapSources.empty(); - } - virtual std::shared_ptr<IDataSource> DoExtractNext() override; - virtual bool DoCheckInFlightLimits() const override { - return (FetchingInFlightCount < InFlightLimit); - //&&(FullIntervalsFetchingCount < InFlightLimit); - } - virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override; - ui32 GetInFlightIntervalsCount(const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const; - -public: - TScanWithLimitCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, - const std::shared_ptr<IScanCursor>& cursor); -}; - -} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.cpp new file mode 100644 index 0000000000..d40f3ee3b2 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.cpp @@ -0,0 +1,14 @@ +#include "abstract.h" + +#include <ydb/core/tx/columnshard/engines/predicate/filter.h> + +namespace NKikimr::NOlap::NReader::NSimple { + +ISourcesCollection::ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context) + : Context(context) { + if (HasAppData() && AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) { + MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest(); + } +} + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.h new file mode 100644 index 0000000000..1c5f00d8dc --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.h @@ -0,0 +1,77 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h> +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h> + +#include <ydb/library/accessor/positive_integer.h> + +namespace NKikimr::NOlap::NReader::NSimple { + +class ISourcesCollection { +private: + virtual bool DoIsFinished() const = 0; + virtual std::shared_ptr<IDataSource> DoExtractNext() = 0; + virtual bool DoCheckInFlightLimits() const = 0; + virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) = 0; + virtual void DoClear() = 0; + virtual void DoAbort() = 0; + + TPositiveControlInteger SourcesInFlightCount; + YDB_READONLY(ui64, MaxInFlight, 1024); + + virtual TString DoDebugString() const { + return ""; + } + virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const = 0; + virtual bool DoHasData() const = 0; + +protected: + const std::shared_ptr<TSpecialReadContext> Context; + +public: + bool HasData() const { + return DoHasData(); + } + + std::shared_ptr<IScanCursor> BuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const { + AFL_VERIFY(source); + AFL_VERIFY(readyRecords <= source->GetRecordsCount())("count", source->GetRecordsCount())("ready", readyRecords); + return DoBuildCursor(source, readyRecords); + } + + TString DebugString() const { + return DoDebugString(); + } + + virtual ~ISourcesCollection() = default; + + std::shared_ptr<IDataSource> ExtractNext() { + SourcesInFlightCount.Inc(); + return DoExtractNext(); + } + + bool IsFinished() const { + return DoIsFinished(); + } + + void OnSourceFinished(const std::shared_ptr<IDataSource>& source) { + AFL_VERIFY(source); + SourcesInFlightCount.Dec(); + DoOnSourceFinished(source); + } + + bool CheckInFlightLimits() const { + return DoCheckInFlightLimits(); + } + + void Clear() { + DoClear(); + } + + void Abort() { + DoAbort(); + } + + ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context); +}; + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.cpp new file mode 100644 index 0000000000..a24f1158b6 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.cpp @@ -0,0 +1,5 @@ +#include "full_scan_sorted.h" + +namespace NKikimr::NOlap::NReader::NSimple { + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.h new file mode 100644 index 0000000000..9fe6771159 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.h @@ -0,0 +1,65 @@ +#pragma once +#include "abstract.h" + +#include <ydb/library/accessor/positive_integer.h> + +namespace NKikimr::NOlap::NReader::NSimple { + +class TSortedFullScanCollection: public ISourcesCollection { +private: + using TBase = ISourcesCollection; + std::deque<TSourceConstructor> HeapSources; + TPositiveControlInteger InFlightCount; + ui32 SourceIdx = 0; + virtual void DoClear() override { + HeapSources.clear(); + } + virtual bool DoHasData() const override { + return HeapSources.size(); + } + virtual void DoAbort() override { + HeapSources.clear(); + } + virtual bool DoIsFinished() const override { + return HeapSources.empty(); + } + virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override { + return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords); + } + virtual std::shared_ptr<IDataSource> DoExtractNext() override { + AFL_VERIFY(HeapSources.size()); + auto result = HeapSources.front().Construct(SourceIdx++, Context); + std::pop_heap(HeapSources.begin(), HeapSources.end()); + HeapSources.pop_back(); + InFlightCount.Inc(); + return result; + } + virtual bool DoCheckInFlightLimits() const override { + return InFlightCount < GetMaxInFlight(); + } + virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& /*source*/) override { + InFlightCount.Dec(); + } + +public: + TSortedFullScanCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, + const std::shared_ptr<IScanCursor>& cursor) + : TBase(context) { + if (cursor && cursor->IsInitialized()) { + for (auto&& i : sources) { + bool usage = false; + if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(i, usage)) { + continue; + } + if (usage) { + i.SetIsStartedByCursor(); + } + break; + } + } + HeapSources = std::move(sources); + std::make_heap(HeapSources.begin(), HeapSources.end()); + } +}; + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.cpp new file mode 100644 index 0000000000..33b2c050e5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.cpp @@ -0,0 +1,51 @@ +#include "limit_sorted.h" + +namespace NKikimr::NOlap::NReader::NSimple { + +std::shared_ptr<IDataSource> TScanWithLimitCollection::DoExtractNext() { + AFL_VERIFY(HeapSources.size()); + std::pop_heap(HeapSources.begin(), HeapSources.end()); + auto result = NextSource ? NextSource : HeapSources.back().Construct(SourceIdxCurrent++, Context); + AFL_VERIFY(FetchingInFlightSources.emplace(result->GetSourceId()).second); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractNext")("source_id", result->GetSourceId()); + HeapSources.pop_back(); + if (HeapSources.size()) { + NextSource = HeapSources.front().Construct(SourceIdxCurrent++, Context); + } else { + NextSource = nullptr; + } + return result; +} + +void TScanWithLimitCollection::DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoOnSourceFinished")("source_id", source->GetSourceId())("limit", Limit)( + "max", GetMaxInFlight())("in_flight_limit", InFlightLimit)("count", FetchingInFlightSources.size()); + if (!source->GetResultRecordsCount() && InFlightLimit < GetMaxInFlight()) { + InFlightLimit = 2 * InFlightLimit; + } + AFL_VERIFY(Cleared || Aborted || FetchingInFlightSources.erase(source->GetSourceId()))("source_id", source->GetSourceId()); +} + +TScanWithLimitCollection::TScanWithLimitCollection( + const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, const std::shared_ptr<IScanCursor>& cursor) + : TBase(context) + , Limit((ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust()) { + HeapSources = std::move(sources); + std::make_heap(HeapSources.begin(), HeapSources.end()); + if (cursor && cursor->IsInitialized()) { + while (HeapSources.size()) { + bool usage = false; + if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(HeapSources.front(), usage)) { + std::pop_heap(HeapSources.begin(), HeapSources.end()); + HeapSources.pop_back(); + continue; + } + if (usage) { + HeapSources.front().SetIsStartedByCursor(); + } + break; + } + } +} + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h new file mode 100644 index 0000000000..e04b105964 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h @@ -0,0 +1,79 @@ +#pragma once +#include "abstract.h" + +#include <ydb/library/accessor/positive_integer.h> + +namespace NKikimr::NOlap::NReader::NSimple { + +class TScanWithLimitCollection: public ISourcesCollection { +private: + using TBase = ISourcesCollection; + class TFinishedDataSource { + private: + YDB_READONLY(ui32, RecordsCount, 0); + YDB_READONLY(ui32, SourceId, 0); + YDB_READONLY(ui32, SourceIdx, 0); + + public: + TFinishedDataSource(const std::shared_ptr<IDataSource>& source) + : RecordsCount(source->GetResultRecordsCount()) + , SourceId(source->GetSourceId()) + , SourceIdx(source->GetSourceIdx()) { + } + + TFinishedDataSource(const std::shared_ptr<IDataSource>& source, const ui32 partSize) + : RecordsCount(partSize) + , SourceId(source->GetSourceId()) + , SourceIdx(source->GetSourceIdx()) { + AFL_VERIFY(partSize < source->GetResultRecordsCount()); + } + }; + + virtual bool DoHasData() const override { + return HeapSources.size(); + } + ui32 SourceIdxCurrent = 0; + std::shared_ptr<IDataSource> NextSource; + std::deque<TSourceConstructor> HeapSources; + ui64 Limit = 0; + ui64 InFlightLimit = 1; + std::set<ui32> FetchingInFlightSources; + bool Aborted = false; + bool Cleared = false; + + void DrainToLimit(); + + virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override { + return std::make_shared<TSimpleScanCursor>(nullptr, source->GetSourceId(), readyRecords); + } + virtual void DoClear() override { + Cleared = true; + HeapSources.clear(); + FetchingInFlightSources.clear(); + } + virtual void DoAbort() override { + Aborted = true; + HeapSources.clear(); + FetchingInFlightSources.clear(); + } + virtual bool DoIsFinished() const override { + return HeapSources.empty() && FetchingInFlightSources.empty(); + } + virtual std::shared_ptr<IDataSource> DoExtractNext() override; + virtual bool DoCheckInFlightLimits() const override { + return FetchingInFlightSources.size() < InFlightLimit; + } + + virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override; + ui32 GetInFlightIntervalsCount(const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const; + +public: + const std::shared_ptr<IDataSource>& GetNextSource() const { + return NextSource; + } + + TScanWithLimitCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, + const std::shared_ptr<IScanCursor>& cursor); +}; + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.cpp new file mode 100644 index 0000000000..8962f02633 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.cpp @@ -0,0 +1,10 @@ +#include "not_sorted.h" + +namespace NKikimr::NOlap::NReader::NSimple { + + std::shared_ptr<NKikimr::NOlap::IScanCursor> TNotSortedCollection::DoBuildCursor( + const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const { + return std::make_shared<TNotSortedSimpleScanCursor>(source->GetSourceId(), readyRecords); +} + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.h new file mode 100644 index 0000000000..45b855d9db --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.h @@ -0,0 +1,78 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NReader::NSimple { + +class TNotSortedCollection: public ISourcesCollection { +private: + using TBase = ISourcesCollection; + std::optional<ui32> Limit; + ui32 InFlightLimit = 1; + std::deque<TSourceConstructor> Sources; + TPositiveControlInteger InFlightCount; + ui32 FetchedCount = 0; + ui32 SourceIdx = 0; + virtual bool DoHasData() const override { + return Sources.size(); + } + virtual void DoClear() override { + Sources.clear(); + } + virtual void DoAbort() override { + Sources.clear(); + } + + virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override; + virtual bool DoIsFinished() const override { + return Sources.empty(); + } + virtual std::shared_ptr<IDataSource> DoExtractNext() override { + AFL_VERIFY(Sources.size()); + auto result = Sources.front().Construct(SourceIdx++, Context); + Sources.pop_front(); + InFlightCount.Inc(); + return result; + } + virtual bool DoCheckInFlightLimits() const override { + return InFlightCount < InFlightLimit; + } + virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override { + if (!source->GetResultRecordsCount() && InFlightLimit * 2 < GetMaxInFlight()) { + InFlightLimit *= 2; + } + FetchedCount += source->GetResultRecordsCount(); + if (Limit && *Limit <= FetchedCount && Sources.size()) { + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount); + Sources.clear(); + } + InFlightCount.Dec(); + } + +public: + TNotSortedCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, + const std::shared_ptr<IScanCursor>& cursor, const std::optional<ui32> limit) + : TBase(context) + , Limit(limit) { + if (Limit) { + InFlightLimit = 1; + } else { + InFlightLimit = GetMaxInFlight(); + } + if (cursor && cursor->IsInitialized()) { + while (sources.size()) { + bool usage = false; + if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(sources.front(), usage)) { + sources.pop_front(); + continue; + } + if (usage) { + sources.front().SetIsStartedByCursor(); + } + break; + } + } + Sources = std::move(sources); + } +}; + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/ya.make new file mode 100644 index 0000000000..640e7bbbe8 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + abstract.cpp + not_sorted.cpp + full_scan_sorted.cpp + limit_sorted.cpp +) + +PEERDIR( + ydb/core/formats/arrow +) + +END() diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp index 9481c3c026..7d82fa6723 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp @@ -100,7 +100,7 @@ TConclusion<bool> TDetectInMem::DoExecuteInplace(const std::shared_ptr<IDataSour source->InitFetchingPlan(plan); TFetchingScriptCursor cursor(plan, 0); FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, source->AddEvent("sdmem")); - auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId(), false); NConveyor::TScanServiceOperator::SendTaskToExecute(task); return false; } @@ -109,25 +109,19 @@ namespace { class TApplySourceResult: public IDataTasksProcessor::ITask { private: using TBase = IDataTasksProcessor::ITask; - YDB_READONLY_DEF(std::shared_ptr<arrow::Table>, Result); YDB_READONLY_DEF(std::shared_ptr<IDataSource>, Source); - YDB_READONLY(ui32, StartIndex, 0); - YDB_READONLY(ui32, OriginalRecordsCount, 0); NColumnShard::TCounterGuard Guard; TFetchingScriptCursor Step; public: - TString GetTaskClassIdentifier() const override { + virtual TString GetTaskClassIdentifier() const override { return "TApplySourceResult"; } - TApplySourceResult(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& result, const ui32 startIndex, - const ui32 originalRecordsCount, const TFetchingScriptCursor& step) + TApplySourceResult( + const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) : TBase(NActors::TActorId()) - , Result(result) , Source(source) - , StartIndex(startIndex) - , OriginalRecordsCount(originalRecordsCount) , Guard(source->GetContext()->GetCommonContext()->GetCounters().GetResultsForSourceGuard()) , Step(step) { } @@ -138,9 +132,9 @@ public: } virtual bool DoApply(IDataReader& indexedDataRead) const override { auto* plainReader = static_cast<TPlainReadData*>(&indexedDataRead); - auto resultCopy = Result; Source->SetCursor(Step); - plainReader->MutableScanner().OnSourceReady(Source, std::move(resultCopy), StartIndex, OriginalRecordsCount, *plainReader); + Source->StartSyncSection(); + plainReader->MutableScanner().GetResultSyncPoint()->OnSourcePrepared(Source, *plainReader); return true; } }; @@ -164,33 +158,47 @@ TConclusion<bool> TBuildResultStep::DoExecuteInplace(const std::shared_ptr<IData resultBatch = nullptr; } } - + const ui32 recordsCount = resultBatch ? resultBatch->num_rows() : 0; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TBuildResultStep")("source_id", source->GetSourceId())("count", recordsCount); + context->GetCommonContext()->GetCounters().OnSourceFinished(source->GetRecordsCount(), source->GetUsedRawBytes(), recordsCount); + source->MutableResultRecordsCount() += recordsCount; + if (!resultBatch || !resultBatch->num_rows()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("empty_source", source->DebugJson().GetStringRobust()); + } + source->MutableStageResult().SetResultChunk(std::move(resultBatch), StartIndex, RecordsCount); NActors::TActivationContext::AsActorContext().Send(context->GetCommonContext()->GetScanActorId(), - new NColumnShard::TEvPrivate::TEvTaskProcessedResult( - std::make_shared<TApplySourceResult>(source, std::move(resultBatch), StartIndex, RecordsCount, step))); + new NColumnShard::TEvPrivate::TEvTaskProcessedResult(std::make_shared<TApplySourceResult>(source, step))); return false; } TConclusion<bool> TPrepareResultStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const { - NCommon::TFetchingScriptBuilder acc(*source->GetContext()); + const auto context = source->GetContext(); + NCommon::TFetchingScriptBuilder acc(*context); if (source->IsSourceInMemory()) { AFL_VERIFY(source->GetStageResult().GetPagesToResultVerified().size() == 1); } + AFL_VERIFY(!source->GetStageResult().IsEmpty()); for (auto&& i : source->GetStageResult().GetPagesToResultVerified()) { - if (source->GetIsStartedByCursor() && !source->GetContext()->GetCommonContext()->GetScanCursor()->CheckSourceIntervalUsage( + if (source->GetIsStartedByCursor() && !context->GetCommonContext()->GetScanCursor()->CheckSourceIntervalUsage( source->GetSourceId(), i.GetIndexStart(), i.GetRecordsCount())) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TPrepareResultStep_ResultStep_SKIP_CURSOR")("source_id", source->GetSourceId()); continue; + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TPrepareResultStep_ResultStep")("source_id", source->GetSourceId()); } acc.AddStep(std::make_shared<TBuildResultStep>(i.GetIndexStart(), i.GetRecordsCount())); } auto plan = std::move(acc).Build(); AFL_VERIFY(!plan->IsFinished(0)); source->InitFetchingPlan(plan); - - TFetchingScriptCursor cursor(plan, 0); - auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId()); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); - return false; + if (source->NeedFullAnswer()) { + TFetchingScriptCursor cursor(plan, 0); + auto task = std::make_shared<TStepAction>(source, std::move(cursor), context->GetCommonContext()->GetScanActorId(), false); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); + return false; + } else { + return true; + } } } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp index ab4cc51d6c..898c6f92c5 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp @@ -57,4 +57,11 @@ void TPlainReadData::OnIntervalResult(const std::shared_ptr<TPartialReadResult>& PartialResults.emplace_back(result); } +void TPlainReadData::OnSentDataFromInterval(const TPartialSourceAddress& sourceAddress) { + if (!SpecialReadContext->IsActive()) { + return; + } + Scanner->GetSyncPoint(sourceAddress.GetSyncPointIndex())->Continue(sourceAddress, *this); +} + } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h index adfe861d63..19c8e14ace 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h @@ -60,12 +60,7 @@ public: TScanHead& MutableScanner() { return *Scanner; } - virtual void OnSentDataFromInterval(const ui32 sourceIdx) const override { - if (!SpecialReadContext->IsActive()) { - return; - } - Scanner->ContinueSource(sourceIdx); - } + virtual void OnSentDataFromInterval(const TPartialSourceAddress& sourceAddress) override; void OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp index deecda0487..94602e9878 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp @@ -1,6 +1,12 @@ #include "plain_read_data.h" #include "scanner.h" +#include "collections/full_scan_sorted.h" +#include "collections/limit_sorted.h" +#include "collections/not_sorted.h" +#include "sync_points/limit.h" +#include "sync_points/result.h" + #include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h> #include <ydb/core/tx/columnshard/engines/reader/common/result.h> @@ -8,59 +14,6 @@ namespace NKikimr::NOlap::NReader::NSimple { -void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& tableExt, const ui32 startIndex, - const ui32 recordsCount, TPlainReadData& reader) { - FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, source->AddEvent("f")); - AFL_DEBUG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG)("event_log", source->GetEventsReport())("count", FetchingSources.size()); - source->MutableResultRecordsCount() += tableExt ? tableExt->num_rows() : 0; - if (!tableExt || !tableExt->num_rows()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("empty_source", source->DebugJson().GetStringRobust()); - } - Context->GetCommonContext()->GetCounters().OnSourceFinished( - source->GetRecordsCount(), source->GetUsedRawBytes(), tableExt ? tableExt->num_rows() : 0); - - source->MutableStageResult().SetResultChunk(std::move(tableExt), startIndex, recordsCount); - while (FetchingSources.size()) { - auto frontSource = FetchingSources.front(); - if (!frontSource->HasStageResult()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result")("source_id", frontSource->GetSourceId())( - "source_idx", frontSource->GetSourceIdx()); - break; - } - if (!frontSource->GetStageResult().HasResultChunk()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result_chunk")("source_id", frontSource->GetSourceId())( - "source_idx", frontSource->GetSourceIdx()); - break; - } - auto table = frontSource->MutableStageResult().ExtractResultChunk(); - const bool isFinished = frontSource->GetStageResult().IsFinished(); - std::optional<ui32> sourceIdxToContinue; - if (!isFinished) { - sourceIdxToContinue = frontSource->GetSourceIdx(); - } - if (table && table->num_rows()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "has_result")("source_id", frontSource->GetSourceId())( - "source_idx", frontSource->GetSourceIdx())("table", table->num_rows()); - auto cursor = SourcesCollection->BuildCursor(frontSource, startIndex + recordsCount); - reader.OnIntervalResult(std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table, - cursor, Context->GetCommonContext(), sourceIdxToContinue)); - SourcesCollection->OnIntervalResult(table, frontSource); - } else if (sourceIdxToContinue) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())( - "source_idx", frontSource->GetSourceIdx()); - ContinueSource(*sourceIdxToContinue); - break; - } - if (!isFinished) { - break; - } - AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx())); - FetchingSources.pop_front(); - frontSource->ClearResult(); - SourcesCollection->OnSourceFinished(frontSource); - } -} - TConclusionStatus TScanHead::Start() { return TConclusionStatus::Success(); } @@ -69,29 +22,30 @@ TScanHead::TScanHead(std::deque<TSourceConstructor>&& sources, const std::shared : Context(context) { if (Context->GetReadMetadata()->IsSorted()) { if (Context->GetReadMetadata()->HasLimit()) { - SourcesCollection = - std::make_unique<TScanWithLimitCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor()); + auto collection = + std::make_shared<TScanWithLimitCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor()); + SourcesCollection = collection; + SyncPoints.emplace_back(std::make_shared<TSyncPointLimitControl>( + (ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust(), SyncPoints.size(), context, collection)); } else { SourcesCollection = - std::make_unique<TSortedFullScanCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor()); + std::make_shared<TSortedFullScanCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor()); } } else { - SourcesCollection = std::make_unique<TNotSortedCollection>( + SourcesCollection = std::make_shared<TNotSortedCollection>( Context, std::move(sources), context->GetCommonContext()->GetScanCursor(), Context->GetReadMetadata()->GetLimitRobustOptional()); } + SyncPoints.emplace_back(std::make_shared<TSyncPointResult>(SyncPoints.size(), context, SourcesCollection)); + for (ui32 i = 0; i + 1 < SyncPoints.size(); ++i) { + SyncPoints[i]->SetNext(SyncPoints[i + 1]); + } } TConclusion<bool> TScanHead::BuildNextInterval() { - if (!Context->IsActive()) { - return false; - } bool changed = false; - while (!SourcesCollection->IsFinished() && SourcesCollection->CheckInFlightLimits() && Context->IsActive()) { + while (SourcesCollection->HasData() && SourcesCollection->CheckInFlightLimits()) { auto source = SourcesCollection->ExtractNext(); - source->InitFetchingPlan(Context->GetColumnsFetchingPlan(source)); - source->StartProcessing(source); - FetchingSources.emplace_back(source); - AFL_VERIFY(FetchingSourcesByIdx.emplace(source->GetSourceIdx(), source).second); + SyncPoints.front()->AddSource(source); changed = true; } return changed; @@ -107,16 +61,15 @@ bool TScanHead::IsReverse() const { void TScanHead::Abort() { AFL_VERIFY(!Context->IsActive()); - for (auto&& i : FetchingSources) { + for (auto&& i : SyncPoints) { i->Abort(); } - FetchingSources.clear(); - SourcesCollection->Clear(); + SourcesCollection->Abort(); Y_ABORT_UNLESS(IsFinished()); } TScanHead::~TScanHead() { - AFL_VERIFY(!IntervalsInFlightCount || !Context->IsActive()); + AFL_VERIFY(IsFinished() || !Context->IsActive()); } } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h index d38ebef6eb..c505433c14 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h @@ -1,7 +1,9 @@ #pragma once -#include "collections.h" #include "source.h" +#include "collections/abstract.h" +#include "sync_points/abstract.h" + #include <ydb/core/formats/arrow/reader/position.h> #include <ydb/core/tx/columnshard/common/limits.h> #include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h> @@ -14,27 +16,39 @@ class TPlainReadData; class TScanHead { private: std::shared_ptr<TSpecialReadContext> Context; - THashMap<ui64, std::shared_ptr<IDataSource>> FetchingSourcesByIdx; - std::deque<std::shared_ptr<IDataSource>> FetchingSources; - TPositiveControlInteger IntervalsInFlightCount; - std::unique_ptr<ISourcesCollection> SourcesCollection; - - void StartNextSource(const std::shared_ptr<TPortionDataSource>& source); + std::shared_ptr<ISourcesCollection> SourcesCollection; + std::vector<std::shared_ptr<ISyncPoint>> SyncPoints; public: - ~TScanHead(); + const std::shared_ptr<ISyncPoint>& GetResultSyncPoint() const { + return SyncPoints.back(); + } + + const std::shared_ptr<ISyncPoint>& GetSyncPoint(const ui32 index) const { + AFL_VERIFY(index < SyncPoints.size()); + return SyncPoints[index]; + } - void ContinueSource(const ui32 sourceIdx) const { - auto it = FetchingSourcesByIdx.find(sourceIdx); - AFL_VERIFY(it != FetchingSourcesByIdx.end())("source_idx", sourceIdx)("count", FetchingSourcesByIdx.size()); - it->second->ContinueCursor(it->second); + ISourcesCollection& MutableSourcesCollection() const { + return *SourcesCollection; } + const ISourcesCollection& GetSourcesCollection() const { + return *SourcesCollection; + } + + ~TScanHead(); + bool IsReverse() const; void Abort(); bool IsFinished() const { - return FetchingSources.empty() && SourcesCollection->IsFinished(); + for (auto&& i : SyncPoints) { + if (!i->IsFinished()) { + return false; + } + } + return SourcesCollection->IsFinished(); } const TReadContext& GetContext() const; @@ -42,16 +56,14 @@ public: TString DebugString() const { TStringBuilder sb; sb << "S:{" << SourcesCollection->DebugString() << "};"; - sb << "F:"; - for (auto&& i : FetchingSources) { - sb << i->GetSourceId() << ";"; + sb << "SP:["; + for (auto&& i : SyncPoints) { + sb << "{" << i->DebugString() << "};"; } + sb << "]"; return sb; } - void OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& table, const ui32 startIndex, - const ui32 recordsCount, TPlainReadData& reader); - TConclusionStatus Start(); TScanHead(std::deque<TSourceConstructor>&& sources, const std::shared_ptr<TSpecialReadContext>& context); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index 9f4b04cddd..d7505745e4 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -26,40 +26,46 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch } void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr) { - AFL_VERIFY(!ProcessingStarted); - InitStageData(std::make_unique<TFetchedData>( - GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations(), sourcePtr->GetRecordsCount())); AFL_VERIFY(FetchingPlan); - ProcessingStarted = true; - SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( - GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId()); - SetMemoryGroupId(SourceGroupGuard->GetGroupId()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx()); - // NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); + if (!ProcessingStarted) { + InitStageData(std::make_unique<TFetchedData>( + GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations(), sourcePtr->GetRecordsCount())); + ProcessingStarted = true; + SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( + GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId()); + SetMemoryGroupId(SourceGroupGuard->GetGroupId()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx()); + // NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); + } TFetchingScriptCursor cursor(FetchingPlan, 0); - auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId(), true); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } void IDataSource::ContinueCursor(const std::shared_ptr<IDataSource>& sourcePtr) { - AFL_VERIFY(!!ScriptCursor); + AFL_VERIFY(!!ScriptCursor)("source_id", GetSourceId()); if (ScriptCursor->Next()) { - auto task = std::make_shared<TStepAction>(sourcePtr, std::move(*ScriptCursor), GetContext()->GetCommonContext()->GetScanActorId()); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("source_id", GetSourceId())("event", "ContinueCursor"); + auto cursor = std::move(*ScriptCursor); ScriptCursor.reset(); + auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId(), true); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); + } else { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("source_id", GetSourceId())("event", "CannotContinueCursor"); } } void IDataSource::DoOnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr<NCommon::IDataSource>& sourcePtr) { auto* plainReader = static_cast<TPlainReadData*>(&owner); - plainReader->MutableScanner().OnSourceReady(std::static_pointer_cast<IDataSource>(sourcePtr), nullptr, 0, GetRecordsCount(), *plainReader); + auto sourceSimple = std::static_pointer_cast<IDataSource>(sourcePtr); + plainReader->MutableScanner().GetSyncPoint(sourceSimple->GetPurposeSyncPointIndex())->OnSourcePrepared(sourceSimple, *plainReader); } void IDataSource::DoOnEmptyStageData(const std::shared_ptr<NCommon::IDataSource>& /*sourcePtr*/) { TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT_EMPTY", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); ResourceGuards.clear(); StageResult = TFetchedResult::BuildEmpty(); - StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) }); + StageResult->SetPages({}); ClearStageData(); } @@ -69,6 +75,7 @@ void IDataSource::DoBuildStageResult(const std::shared_ptr<NCommon::IDataSource> void IDataSource::Finalize(const std::optional<ui64> memoryLimit) { TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); + AFL_VERIFY(!GetStageData().IsEmptyWithData()); if (memoryLimit && !IsSourceInMemory()) { const auto accessor = GetStageData().GetPortionAccessor(); StageResult = std::make_unique<TFetchedResult>(ExtractStageData(), *GetContext()->GetCommonContext()->GetResolver()); @@ -77,6 +84,10 @@ void IDataSource::Finalize(const std::optional<ui64> memoryLimit) { StageResult = std::make_unique<TFetchedResult>(ExtractStageData(), *GetContext()->GetCommonContext()->GetResolver()); StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) }); } + if (StageResult->IsEmpty()) { + StageResult = TFetchedResult::BuildEmpty(); + StageResult->SetPages({}); + } ClearStageData(); } @@ -374,7 +385,7 @@ private: Source->MutableStageData().SetPortionAccessor(std::move(result.ExtractPortionsVector().front())); Source->InitUsedRawBytes(); AFL_VERIFY(Step.Next()); - auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId(), false); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h index 290a6f2205..22a888aff5 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h @@ -51,6 +51,10 @@ private: NArrow::TComparablePosition Value; public: + const NArrow::TComparablePosition& GetValue() const { + return Value; + } + TReplaceKeyAdapter(const NArrow::TReplaceKey& rk, const bool reverse) : Reverse(reverse) , Value(rk) { @@ -138,6 +142,8 @@ private: virtual void DoOnEmptyStageData(const std::shared_ptr<NCommon::IDataSource>& /*sourcePtr*/) override; void Finalize(const std::optional<ui64> memoryLimit); + bool NeedFullAnswerFlag = true; + std::optional<ui32> PurposeSyncPointIndex; protected: std::optional<ui64> UsedRawBytes; @@ -149,6 +155,33 @@ protected: virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) = 0; public: + bool NeedFullAnswer() const { + return NeedFullAnswerFlag; + } + + void SetNeedFullAnswer(const bool value) { + NeedFullAnswerFlag = value; + } + + ui32 GetPurposeSyncPointIndex() const { + AFL_VERIFY(PurposeSyncPointIndex); + return *PurposeSyncPointIndex; + } + + void ResetPurposeSyncPointIndex() { + AFL_VERIFY(PurposeSyncPointIndex); + PurposeSyncPointIndex.reset(); + } + + void SetPurposeSyncPointIndex(const ui32 value) { + if (!PurposeSyncPointIndex) { + AFL_VERIFY(value == 0); + } else { + AFL_VERIFY(*PurposeSyncPointIndex < value); + } + PurposeSyncPointIndex = value; + } + virtual void InitUsedRawBytes() = 0; ui64 GetUsedRawBytes() const { @@ -226,6 +259,9 @@ public: virtual bool HasIndexes(const std::set<ui32>& indexIds) const = 0; void InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetching); + bool HasFetchingPlan() const { + return !!FetchingPlan; + } virtual ui64 GetIndexRawBytes(const std::set<ui32>& indexIds) const = 0; @@ -470,10 +506,10 @@ public: return item.Start < Start; } - std::shared_ptr<TPortionDataSource> Construct(const std::shared_ptr<TSpecialReadContext>& context) const { + std::shared_ptr<TPortionDataSource> Construct(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context) const { const auto& portions = context->GetReadMetadata()->SelectInfo->Portions; - AFL_VERIFY(PortionIdx < portions.size()); - auto result = std::make_shared<TPortionDataSource>(PortionIdx, portions[PortionIdx], context); + AFL_VERIFY(sourceIdx < portions.size()); + auto result = std::make_shared<TPortionDataSource>(sourceIdx, portions[PortionIdx], context); if (IsStartedByCursorFlag) { result->SetIsStartedByCursor(); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp new file mode 100644 index 0000000000..9d840e4643 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp @@ -0,0 +1,93 @@ +#include "abstract.h" + +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h> + +#include <util/string/builder.h> + +namespace NKikimr::NOlap::NReader::NSimple { + +void ISyncPoint::OnSourcePrepared(const std::shared_ptr<IDataSource>& sourceInput, TPlainReadData& reader) { + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("sync_point", GetPointName())("aborted", AbortFlag); + if (AbortFlag) { + FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, sourceInput->AddEvent("a" + GetShortPointName())); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "sync_point_aborted")("source_id", sourceInput->GetSourceId()); + return; + } else { + FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, sourceInput->AddEvent("f" + GetShortPointName())); + } + AFL_DEBUG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG)("event_log", sourceInput->GetEventsReport())("count", SourcesSequentially.size())( + "source_id", sourceInput->GetSourceId()); + AFL_VERIFY(sourceInput->IsSyncSection())("source_id", sourceInput->GetSourceId()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "OnSourcePrepared")("source_id", sourceInput->GetSourceId()); + while (SourcesSequentially.size() && IsSourcePrepared(SourcesSequentially.front())) { + auto source = SourcesSequentially.front(); + switch (OnSourceReady(source, reader)) { + case ESourceAction::Finish: { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "finish_source")("source_id", source->GetSourceId()); + reader.GetScanner().MutableSourcesCollection().OnSourceFinished(source); + SourcesSequentially.pop_front(); + break; + } + case ESourceAction::ProvideNext: { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "provide_source")("source_id", source->GetSourceId()); + if (Next) { + source->ResetSourceFinishedFlag(); + Next->AddSource(source); + } else { + reader.GetScanner().MutableSourcesCollection().OnSourceFinished(source); + } + SourcesSequentially.pop_front(); + break; + } + case ESourceAction::Wait: { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "wait_source")("source_id", source->GetSourceId()); + return; + } + } + } +} + +TString ISyncPoint::DebugString() const { + TStringBuilder sb; + sb << "{"; + for (auto&& i : SourcesSequentially) { + sb << i->GetSourceId() << ","; + } + sb << "}"; + return sb; +} + +void ISyncPoint::Continue(const TPartialSourceAddress& continueAddress, TPlainReadData& /*reader*/) { + AFL_VERIFY(PointIndex == continueAddress.GetSyncPointIndex()); + AFL_VERIFY(SourcesSequentially.size() && SourcesSequentially.front()->GetSourceId() == continueAddress.GetSourceId())("first_source_id", + SourcesSequentially.front()->GetSourceId())( + "continue_source_id", continueAddress.GetSourceId()); + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("sync_point", GetPointName())("event", "continue_source"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("source_id", SourcesSequentially.front()->GetSourceId()); + SourcesSequentially.front()->ContinueCursor(SourcesSequentially.front()); +} + +void ISyncPoint::AddSource(const std::shared_ptr<IDataSource>& source) { + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("sync_point", GetPointName())("event", "add_source"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("source_id", source->GetSourceId()); + AFL_VERIFY(!AbortFlag); + source->SetPurposeSyncPointIndex(GetPointIndex()); + if (Next) { + source->SetNeedFullAnswer(false); + } + AFL_VERIFY(!!source); + if (!LastSourceIdx) { + LastSourceIdx = source->GetSourceIdx(); + } else { + AFL_VERIFY(*LastSourceIdx < source->GetSourceIdx()); + } + LastSourceIdx = source->GetSourceIdx(); + SourcesSequentially.emplace_back(source); + if (!source->HasFetchingPlan()) { + source->InitFetchingPlan(Context->GetColumnsFetchingPlan(source)); + } + OnAddSource(source); + source->StartProcessing(source); +} + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h new file mode 100644 index 0000000000..5ef3e4ede2 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h @@ -0,0 +1,83 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/reader/common/result.h> +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h> +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h> + +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NOlap::NReader::NSimple { + +class TPlainReadData; +class ISourcesCollection; + +class ISyncPoint { +public: + enum class ESourceAction { + Finish, + ProvideNext, + Wait + }; + +private: + YDB_READONLY(ui32, PointIndex, 0); + YDB_READONLY_DEF(TString, PointName); + std::optional<ui32> LastSourceIdx; + virtual void OnAddSource(const std::shared_ptr<IDataSource>& /*source*/) { + } + virtual bool IsSourcePrepared(const std::shared_ptr<IDataSource>& source) const = 0; + virtual ESourceAction OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& reader) = 0; + virtual void DoAbort() = 0; + bool AbortFlag = false; + +protected: + const std::shared_ptr<TSpecialReadContext> Context; + const std::shared_ptr<ISourcesCollection> Collection; + std::shared_ptr<ISyncPoint> Next; + std::deque<std::shared_ptr<IDataSource>> SourcesSequentially; + +public: + virtual ~ISyncPoint() = default; + + void Continue(const TPartialSourceAddress& continueAddress, TPlainReadData& reader); + + TString DebugString() const; + + void Abort() { + SourcesSequentially.clear(); + if (!AbortFlag) { + AbortFlag = true; + DoAbort(); + } + } + + bool IsFinished() const { + return SourcesSequentially.empty(); + } + + void SetNext(const std::shared_ptr<ISyncPoint>& next) { + AFL_VERIFY(!Next); + Next = next; + } + + TString GetShortPointName() const { + if (PointName.size() < 2) { + return PointName; + } else { + return PointName.substr(0, 2); + } + } + + ISyncPoint(const ui32 pointIndex, const TString& pointName, const std::shared_ptr<TSpecialReadContext>& context, + const std::shared_ptr<ISourcesCollection>& collection) + : PointIndex(pointIndex) + , PointName(pointName) + , Context(context) + , Collection(collection) { + } + + void AddSource(const std::shared_ptr<IDataSource>& source); + + void OnSourcePrepared(const std::shared_ptr<IDataSource>& sourceInput, TPlainReadData& reader); +}; + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp new file mode 100644 index 0000000000..31c4977f35 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp @@ -0,0 +1,85 @@ +#include "limit.h" + +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h> + +namespace NKikimr::NOlap::NReader::NSimple { + +TSyncPointLimitControl::TSyncPointLimitControl(const ui32 limit, const ui32 pointIndex, const std::shared_ptr<TSpecialReadContext>& context, + const std::shared_ptr<TScanWithLimitCollection>& collection) + : TBase(pointIndex, "SYNC_LIMIT", context, collection) + , Limit(limit) + , Collection(collection) { + AFL_VERIFY(Collection); +} + +bool TSyncPointLimitControl::DrainToLimit() { + std::optional<TSourceIterator> nextInHeap; + if (Collection->GetNextSource()) { + nextInHeap = TSourceIterator(Collection->GetNextSource()); + } + if (Iterators.empty() || (nextInHeap && Iterators.front() < *nextInHeap)) { + return false; + } + + while (Iterators.size()) { + if (!Iterators.front().IsFilled()) { + return false; + } + std::pop_heap(Iterators.begin(), Iterators.end()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "LimitIteratorNext")("source_id", Iterators.back().GetSourceId())( + "fetched", FetchedCount)("limit", Limit)("iterators", Iterators.size()); + if (!Iterators.back().Next()) { + Iterators.pop_back(); + } else { + std::push_heap(Iterators.begin(), Iterators.end()); + if (++FetchedCount >= Limit) { + return true; + } + } + } + return false; +} + +ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& /*reader*/) { + if (FetchedCount >= Limit) { + return ESourceAction::Finish; + } + const auto& rk = *source->GetSourceSchema()->GetIndexInfo().GetReplaceKey(); + const auto& g = source->GetStageResult().GetBatch(); + AFL_VERIFY(Iterators.size()); + AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId()); + std::pop_heap(Iterators.begin(), Iterators.end()); + if (!g || !g->GetRecordsCount()) { + Iterators.pop_back(); + } else { + std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> arrs; + for (auto&& i : rk.fields()) { + auto acc = g->GetAccessorByNameOptional(i->name()); + if (!acc) { + break; + } + arrs.emplace_back(acc); + } + AFL_VERIFY(arrs.size()); + if (!PKPrefixSize) { + PKPrefixSize = arrs.size(); + } else { + AFL_VERIFY(*PKPrefixSize == arrs.size())("prefix", PKPrefixSize)("arr", arrs.size()); + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoOnSourceCheckLimitFillIterator")("source_id", source->GetSourceId())( + "fetched", FetchedCount)("limit", Limit); + Iterators.back() = TSourceIterator(arrs, source->GetStageResult().GetNotAppliedFilter(), source); + AFL_VERIFY(Iterators.back().IsFilled()); + std::push_heap(Iterators.begin(), Iterators.end()); + } + if (DrainToLimit()) { + Collection->Clear(); + } + if (source->GetStageResult().IsEmpty()) { + return ESourceAction::Finish; + } else { + return ESourceAction::ProvideNext; + } +} + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h new file mode 100644 index 0000000000..b18582345b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h @@ -0,0 +1,140 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NReader::NSimple { + +class TScanWithLimitCollection; + +class TSyncPointLimitControl: public ISyncPoint { +private: + using TBase = ISyncPoint; + + const ui32 Limit; + std::shared_ptr<TScanWithLimitCollection> Collection; + ui32 FetchedCount = 0; + std::optional<ui32> PKPrefixSize; + + virtual bool IsSourcePrepared(const std::shared_ptr<IDataSource>& source) const override { + if (source->IsSyncSection() && source->HasStageResult()) { + AFL_VERIFY(!source->GetStageResult().HasResultChunk()); + return true; + } + return false; + } + class TSourceIterator { + private: + std::shared_ptr<IDataSource> Source; + bool Reverse; + int Delta = 0; + i64 Start = 0; + i64 Finish = 0; + std::shared_ptr<NArrow::NMerger::TRWSortableBatchPosition> SortableRecord; + std::shared_ptr<NArrow::TColumnFilter> Filter; + std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator; + bool IsValidFlag = true; + + bool ShiftWithFilter() const { + AFL_VERIFY(IsValidFlag); + while (!FilterIterator->GetCurrentAcceptance()) { + if (!FilterIterator->Next(1)) { + AFL_VERIFY(!SortableRecord->NextPosition(Delta)); + return false; + } else { + AFL_VERIFY(SortableRecord->NextPosition(Delta)); + } + } + return true; + } + + public: + const std::shared_ptr<IDataSource>& GetSource() const { + AFL_VERIFY(Source); + return Source; + } + + TSourceIterator(const std::shared_ptr<IDataSource>& source) + : Source(source) + , Reverse(Source->GetContext()->GetReadMetadata()->IsDescSorted()) + , Delta(Reverse ? -1 : 1) { + AFL_VERIFY(Source); + auto arr = Source->GetStart().GetValue().GetArrays(); + auto batch = + arrow::RecordBatch::Make(Source->GetSourceSchema()->GetIndexInfo().GetReplaceKey(), arr.front()->length(), std::move(arr)); + SortableRecord = + std::make_shared<NArrow::NMerger::TRWSortableBatchPosition>(batch, Source->GetStart().GetValue().GetMonoPosition(), Reverse); + } + + TSourceIterator(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& arrs, + const std::shared_ptr<NArrow::TColumnFilter>& filter, const std::shared_ptr<IDataSource>& source) + : Source(source) + , Reverse(Source->GetContext()->GetReadMetadata()->IsDescSorted()) + , Delta(Reverse ? -1 : 1) + , Start(Reverse ? (arrs.front()->GetRecordsCount() - 1) : 0) + , Finish(Reverse ? 0 : (arrs.front()->GetRecordsCount() - 1)) + , Filter(filter ? filter : std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter())) { + AFL_VERIFY(arrs.size()); + AFL_VERIFY(arrs.front()->GetRecordsCount()); + FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(Reverse, arrs.front()->GetRecordsCount())); + auto prefixSchema = Source->GetSourceSchema()->GetIndexInfo().GetReplaceKeyPrefix(arrs.size()); + auto copyArrs = arrs; + auto batch = std::make_shared<NArrow::TGeneralContainer>(prefixSchema->fields(), std::move(copyArrs)); + SortableRecord = std::make_shared<NArrow::NMerger::TRWSortableBatchPosition>(batch, Start, Reverse); + IsValidFlag = ShiftWithFilter(); + } + + ui64 GetSourceId() const { + AFL_VERIFY(Source); + return Source->GetSourceId(); + } + + bool IsFilled() const { + return !!Filter; + } + + bool IsValid() const { + return IsValidFlag; + } + + bool Next() { + AFL_VERIFY(IsValidFlag); + AFL_VERIFY(!!SortableRecord); + AFL_VERIFY(!!Filter); + IsValidFlag = SortableRecord->NextPosition(Delta); + AFL_VERIFY(FilterIterator->Next(1) == IsValidFlag); + if (IsValidFlag) { + IsValidFlag = ShiftWithFilter(); + } + return IsValidFlag; + } + + bool operator<(const TSourceIterator& item) const { + const auto cmp = SortableRecord->ComparePartial(*item.SortableRecord); + if (cmp == std::partial_ordering::equivalent) { + return item.Source->GetSourceId() < Source->GetSourceId(); + } + return cmp == std::partial_ordering::greater; + } + }; + + std::vector<TSourceIterator> Iterators; + + virtual void OnAddSource(const std::shared_ptr<IDataSource>& source) override { + AFL_VERIFY(FetchedCount < Limit); + Iterators.emplace_back(TSourceIterator(source)); + std::push_heap(Iterators.begin(), Iterators.end()); + } + + virtual void DoAbort() override { + Iterators.clear(); + } + + virtual ESourceAction OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& reader) override; + + bool DrainToLimit(); + +public: + TSyncPointLimitControl(const ui32 limit, const ui32 pointIndex, const std::shared_ptr<TSpecialReadContext>& context, + const std::shared_ptr<TScanWithLimitCollection>& collection); +}; + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp new file mode 100644 index 0000000000..6d301e9482 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp @@ -0,0 +1,35 @@ +#include "result.h" + +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h> + +namespace NKikimr::NOlap::NReader::NSimple { + +ISyncPoint::ESourceAction TSyncPointResult::OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& reader) { + if (source->GetStageResult().IsEmpty()) { + return ESourceAction::Finish; + } + auto resultChunk = source->MutableStageResult().ExtractResultChunk(); + const bool isFinished = source->GetStageResult().IsFinished(); + if (resultChunk && resultChunk->HasData()) { + std::optional<TPartialSourceAddress> partialSourceAddress; + if (!isFinished) { + partialSourceAddress = TPartialSourceAddress(source->GetSourceId(), source->GetSourceIdx(), GetPointIndex()); + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "has_result")("source_id", source->GetSourceId())( + "source_idx", source->GetSourceIdx())("table", resultChunk->GetTable()->num_rows())("is_finished", isFinished); + auto cursor = Collection->BuildCursor(source, resultChunk->GetStartIndex() + resultChunk->GetRecordsCount()); + reader.OnIntervalResult(std::make_shared<TPartialReadResult>(source->GetResourceGuards(), source->GetGroupGuard(), + resultChunk->GetTable(), cursor, Context->GetCommonContext(), partialSourceAddress)); + } else if (!isFinished) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "continue_source")("source_id", source->GetSourceId())( + "source_idx", source->GetSourceIdx()); + source->ContinueCursor(source); + } + if (!isFinished) { + return ESourceAction::Wait; + } + source->ClearResult(); + return ESourceAction::ProvideNext; +} + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.h new file mode 100644 index 0000000000..8df2879d93 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.h @@ -0,0 +1,25 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NReader::NSimple { + +class TSyncPointResult: public ISyncPoint { +private: + using TBase = ISyncPoint; + virtual void DoAbort() override { + } + + virtual ESourceAction OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& reader) override; + virtual bool IsSourcePrepared(const std::shared_ptr<IDataSource>& source) const override { + return source->IsSyncSection() && source->HasStageResult() && + (source->GetStageResult().HasResultChunk() || source->GetStageResult().IsEmpty()); + } + +public: + TSyncPointResult( + const ui32 pointIndex, const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<ISourcesCollection>& collection) + : TBase(pointIndex, "RESULT", context, collection) { + } +}; + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/ya.make new file mode 100644 index 0000000000..58d66933ef --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + abstract.cpp + result.cpp + limit.cpp +) + +PEERDIR( + ydb/core/formats/arrow +) + +END() diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make index 3917d8d913..baff60941f 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make @@ -8,13 +8,14 @@ SRCS( context.cpp fetching.cpp iterator.cpp - collections.cpp ) PEERDIR( ydb/core/formats/arrow ydb/core/tx/columnshard/blobs_action ydb/core/tx/columnshard/engines/reader/common_reader/iterator + ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections + ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points ydb/core/tx/conveyor/usage ydb/core/tx/limiter/grouped_memory/usage ) diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 92574ef1a2..7fc0cce69a 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -382,6 +382,16 @@ public: return PKColumnIds[0]; } + std::shared_ptr<arrow::Schema> GetReplaceKeyPrefix(const ui32 size) const { + AFL_VERIFY(size); + AFL_VERIFY(size <= (ui32)PrimaryKey->num_fields()); + if (size == (ui32)PrimaryKey->num_fields()) { + return PrimaryKey; + } else { + std::vector<std::shared_ptr<arrow::Field>> fields(PrimaryKey->fields().begin(), PrimaryKey->fields().begin() + size); + return std::make_shared<arrow::Schema>(std::move(fields)); + } + } const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return PrimaryKey; } diff --git a/ydb/library/formats/arrow/replace_key.h b/ydb/library/formats/arrow/replace_key.h index 5e8de57478..c4fe75d21d 100644 --- a/ydb/library/formats/arrow/replace_key.h +++ b/ydb/library/formats/arrow/replace_key.h @@ -245,6 +245,23 @@ private: std::vector<ui32> Positions; public: + ui32 GetMonoPosition() const { + std::optional<ui32> result; + for (auto&& i : Positions) { + if (!result) { + result = i; + } else { + AFL_VERIFY(*result == i); + } + } + AFL_VERIFY(result); + return *result; + } + + const std::vector<std::shared_ptr<arrow::Array>>& GetArrays() const { + return Arrays; + } + TComparablePosition(const TReplaceKey& key) : Arrays(*key.GetColumns()) , Positions(Arrays.size(), key.GetPosition()) { |