aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-04-11 18:28:42 +0300
committerGitHub <noreply@github.com>2025-04-11 18:28:42 +0300
commit381d45aaf77a08c42b2f00fd7580dc497ab04d7f (patch)
tree12055e69f2f2acce5e38dd274ce56c24de7d8df5
parent377a863d791f96389c1e8bdc20861e56302c0d90 (diff)
downloadydb-381d45aaf77a08c42b2f00fd7580dc497ab04d7f.tar.gz
search engine with sequential syn_points for limit control (#16896)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
-rw-r--r--ydb/core/formats/arrow/accessor/abstract/accessor.cpp6
-rw-r--r--ydb/core/formats/arrow/accessor/abstract/common.cpp18
-rw-r--r--ydb/core/formats/arrow/accessor/composite/accessor.cpp1
-rw-r--r--ydb/core/formats/arrow/program/collection.h13
-rw-r--r--ydb/core/formats/arrow/program/graph_execute.cpp2
-rw-r--r--ydb/core/formats/arrow/reader/position.cpp6
-rw-r--r--ydb/core/formats/arrow/reader/position.h75
-rw-r--r--ydb/core/formats/arrow/ut/ut_program_step.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/filter.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/abstract.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/read_context.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/actor/actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common/result.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common/result.h24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h33
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h21
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp115
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h244
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.h77
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.h65
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.cpp51
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h79
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.h78
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/ya.make14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp52
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp91
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h50
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h42
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp93
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h83
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp85
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h140
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp35
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.h25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/ya.make13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h10
-rw-r--r--ydb/library/formats/arrow/replace_key.h17
56 files changed, 1253 insertions, 592 deletions
diff --git a/ydb/core/formats/arrow/accessor/abstract/accessor.cpp b/ydb/core/formats/arrow/accessor/abstract/accessor.cpp
index 82e7bafd87..152bae082d 100644
--- a/ydb/core/formats/arrow/accessor/abstract/accessor.cpp
+++ b/ydb/core/formats/arrow/accessor/abstract/accessor.cpp
@@ -131,12 +131,12 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArray(const TColum
const ui32 start = context.GetStartIndex().value_or(0);
const ui32 count = context.GetRecordsCount().value_or(GetRecordsCount() - start);
auto slice = ISlice(start, count);
- if (context.GetFilter()) {
- return ApplyFilter(*context.GetFilter(), nullptr)->GetChunkedArrayTrivial();
+ if (context.GetFilter() && !context.GetFilter()->IsTotalAllowFilter()) {
+ return slice->ApplyFilter(context.GetFilter()->Slice(start, count), slice)->GetChunkedArrayTrivial();
} else {
return slice->GetChunkedArrayTrivial();
}
- } else if (context.GetFilter()) {
+ } else if (context.GetFilter() && !context.GetFilter()->IsTotalAllowFilter()) {
return ApplyFilter(*context.GetFilter(), nullptr)->GetChunkedArrayTrivial();
} else {
return GetChunkedArrayTrivial();
diff --git a/ydb/core/formats/arrow/accessor/abstract/common.cpp b/ydb/core/formats/arrow/accessor/abstract/common.cpp
index 7b27b68985..04fc014e1e 100644
--- a/ydb/core/formats/arrow/accessor/abstract/common.cpp
+++ b/ydb/core/formats/arrow/accessor/abstract/common.cpp
@@ -15,20 +15,12 @@ TColumnConstructionContext& TColumnConstructionContext::SetFilter(const std::sha
std::optional<TColumnConstructionContext> TColumnConstructionContext::Slice(const ui32 offset, const ui32 count) const {
std::optional<TColumnConstructionContext> result;
- if (StartIndex && RecordsCount) {
- const ui32 start = std::max<ui32>(offset, *StartIndex);
- const ui32 finish = std::min<ui32>(offset + count, *StartIndex + *RecordsCount);
- if (finish <= start) {
- result = std::nullopt;
- } else {
- result = TColumnConstructionContext().SetStartIndex(start - offset).SetRecordsCount(finish - start, count);
- }
- } else if (StartIndex && !RecordsCount) {
- result = TColumnConstructionContext().SetStartIndex(std::max<ui32>(offset, *StartIndex) - offset);
- } else if (!StartIndex && RecordsCount) {
- result = TColumnConstructionContext().SetRecordsCount(std::min<ui32>(count, *RecordsCount), count);
+ const ui32 start = std::max<ui32>(offset, StartIndex.value_or(0));
+ const ui32 finish = std::min<ui32>(offset + count, StartIndex.value_or(0) + RecordsCount.value_or(offset + count));
+ if (finish <= start) {
+ result = std::nullopt;
} else {
- result = TColumnConstructionContext();
+ result = TColumnConstructionContext().SetStartIndex(start - offset).SetRecordsCount(finish - start, count);
}
if (result && Filter) {
result->SetFilter(std::make_shared<TColumnFilter>(Filter->Slice(offset, count)));
diff --git a/ydb/core/formats/arrow/accessor/composite/accessor.cpp b/ydb/core/formats/arrow/accessor/composite/accessor.cpp
index e5f4452549..eef73d9970 100644
--- a/ydb/core/formats/arrow/accessor/composite/accessor.cpp
+++ b/ydb/core/formats/arrow/accessor/composite/accessor.cpp
@@ -119,6 +119,7 @@ std::shared_ptr<arrow::ChunkedArray> TCompositeChunkedArray::GetChunkedArray(con
if (chunks.size()) {
break;
} else {
+ pos += i->GetRecordsCount();
continue;
}
}
diff --git a/ydb/core/formats/arrow/program/collection.h b/ydb/core/formats/arrow/program/collection.h
index 3941ac1abc..5d1f6a5b29 100644
--- a/ydb/core/formats/arrow/program/collection.h
+++ b/ydb/core/formats/arrow/program/collection.h
@@ -59,12 +59,19 @@ public:
AFL_VERIFY(Markers.erase(marker));
}
- bool IsEmptyFiltered() const {
+ bool IsEmptyFilter() const {
return Filter->IsTotalDenyFilter();
}
- bool HasAccessors() const {
- return Accessors.size();
+ bool HasData() const {
+ return Accessors.size() || !!RecordsCountActual;
+ }
+
+ bool HasDataAndResultIsEmpty() const {
+ if (!HasData()) {
+ return false;
+ }
+ return !GetRecordsCountActualVerified() || IsEmptyFilter();
}
std::optional<ui32> GetRecordsCountActualOptional() const {
diff --git a/ydb/core/formats/arrow/program/graph_execute.cpp b/ydb/core/formats/arrow/program/graph_execute.cpp
index 0bd1f099f1..a8e2f3f5c6 100644
--- a/ydb/core/formats/arrow/program/graph_execute.cpp
+++ b/ydb/core/formats/arrow/program/graph_execute.cpp
@@ -168,7 +168,7 @@ TConclusionStatus TCompiledGraph::Apply(
AFL_VERIFY(*conclusion != IResourceProcessor::EExecutionResult::InBackground);
}
}
- if (resources->IsEmptyFiltered()) {
+ if (resources->HasDataAndResultIsEmpty()) {
resources->Clear();
return TConclusionStatus::Success();
}
diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp
index 81e576d9fa..5476975998 100644
--- a/ydb/core/formats/arrow/reader/position.cpp
+++ b/ydb/core/formats/arrow/reader/position.cpp
@@ -125,6 +125,12 @@ TSortableScanData::TSortableScanData(
BuildPosition(position);
}
+TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr<TGeneralContainer>& batch) {
+ Fields = batch->GetSchema()->GetFields();
+ Columns = batch->GetColumns();
+ BuildPosition(position);
+}
+
TSortableScanData::TSortableScanData(
const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns) {
for (auto&& i : columns) {
diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h
index 4f5278029d..a902744fe6 100644
--- a/ydb/core/formats/arrow/reader/position.h
+++ b/ydb/core/formats/arrow/reader/position.h
@@ -73,11 +73,47 @@ private:
return StartPosition <= position && position < FinishPosition;
}
+ std::partial_ordering CompareImpl(const ui64 position, const TSortableScanData& item, const ui64 itemPosition, const ui32 size) const {
+ AFL_VERIFY(size <= PositionAddress.size() && size <= item.PositionAddress.size());
+ AFL_VERIFY(size);
+ if (Contains(position) && item.Contains(itemPosition)) {
+ for (ui32 idx = 0; idx < size; ++idx) {
+ std::partial_ordering cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition);
+ if (cmp != std::partial_ordering::equivalent) {
+ return cmp;
+ }
+ }
+ } else {
+ for (ui32 idx = 0; idx < size; ++idx) {
+ std::partial_ordering cmp = std::partial_ordering::equivalent;
+ const bool containsSelf = PositionAddress[idx].GetAddress().Contains(position);
+ const bool containsItem = item.PositionAddress[idx].GetAddress().Contains(itemPosition);
+ if (containsSelf && containsItem) {
+ cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition);
+ } else if (containsSelf) {
+ auto temporaryAddress = item.Columns[idx]->GetChunk(item.PositionAddress[idx].GetAddress(), itemPosition);
+ cmp = PositionAddress[idx].Compare(position, temporaryAddress, itemPosition);
+ } else if (containsItem) {
+ auto temporaryAddress = Columns[idx]->GetChunk(PositionAddress[idx].GetAddress(), position);
+ cmp = temporaryAddress.Compare(position, item.PositionAddress[idx], itemPosition);
+ } else {
+ AFL_VERIFY(false);
+ }
+ if (cmp != std::partial_ordering::equivalent) {
+ return cmp;
+ }
+ }
+ }
+
+ return std::partial_ordering::equivalent;
+ }
+
public:
TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch);
TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns);
TSortableScanData(const ui64 position, const std::shared_ptr<arrow::Table>& batch, const std::vector<std::string>& columns);
TSortableScanData(const ui64 position, const std::shared_ptr<TGeneralContainer>& batch, const std::vector<std::string>& columns);
+ TSortableScanData(const ui64 position, const std::shared_ptr<TGeneralContainer>& batch);
TSortableScanData(const ui64 position, const ui64 recordsCount, const std::vector<std::shared_ptr<NAccessor::IChunkedArray>>& columns,
const std::vector<std::shared_ptr<arrow::Field>>& fields)
: RecordsCount(recordsCount)
@@ -117,36 +153,11 @@ public:
std::partial_ordering Compare(const ui64 position, const TSortableScanData& item, const ui64 itemPosition) const {
AFL_VERIFY(PositionAddress.size() == item.PositionAddress.size());
- if (Contains(position) && item.Contains(itemPosition)) {
- for (ui32 idx = 0; idx < PositionAddress.size(); ++idx) {
- std::partial_ordering cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition);
- if (cmp != std::partial_ordering::equivalent) {
- return cmp;
- }
- }
- } else {
- for (ui32 idx = 0; idx < PositionAddress.size(); ++idx) {
- std::partial_ordering cmp = std::partial_ordering::equivalent;
- const bool containsSelf = PositionAddress[idx].GetAddress().Contains(position);
- const bool containsItem = item.PositionAddress[idx].GetAddress().Contains(itemPosition);
- if (containsSelf && containsItem) {
- cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition);
- } else if (containsSelf) {
- auto temporaryAddress = item.Columns[idx]->GetChunk(item.PositionAddress[idx].GetAddress(), itemPosition);
- cmp = PositionAddress[idx].Compare(position, temporaryAddress, itemPosition);
- } else if (containsItem) {
- auto temporaryAddress = Columns[idx]->GetChunk(PositionAddress[idx].GetAddress(), position);
- cmp = temporaryAddress.Compare(position, item.PositionAddress[idx], itemPosition);
- } else {
- AFL_VERIFY(false);
- }
- if (cmp != std::partial_ordering::equivalent) {
- return cmp;
- }
- }
- }
+ return CompareImpl(position, item, itemPosition, item.PositionAddress.size());
+ }
- return std::partial_ordering::equivalent;
+ std::partial_ordering ComparePartial(const ui64 position, const TSortableScanData& item, const ui64 itemPosition) const {
+ return CompareImpl(position, item, itemPosition, std::min<ui32>(PositionAddress.size(), item.PositionAddress.size()));
}
void AppendPositionTo(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const ui64 position, ui64* recordSize) const;
@@ -414,6 +425,12 @@ public:
return ApplyOptionalReverseForCompareResult(directResult);
}
+ std::partial_ordering ComparePartial(const TSortableBatchPosition& item) const {
+ Y_ABORT_UNLESS(item.ReverseSort == ReverseSort);
+ const auto directResult = Sorting->ComparePartial(Position, *item.Sorting, item.GetPosition());
+ return ApplyOptionalReverseForCompareResult(directResult);
+ }
+
std::partial_ordering Compare(const TSortableScanData& data, const ui64 dataPosition) const {
return Sorting->Compare(Position, data, dataPosition);
}
diff --git a/ydb/core/formats/arrow/ut/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp
index 951cc57bfb..7a6c52b5c6 100644
--- a/ydb/core/formats/arrow/ut/ut_program_step.cpp
+++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp
@@ -149,7 +149,13 @@ struct TSumData {
}
static void CheckResult(ETest test, const std::shared_ptr<TAccessorsCollection>& batch, ui32 numKeys, bool nullable) {
- AFL_VERIFY(batch->GetColumnsCount() == numKeys + 2);
+ if (test == ETest::EMPTY) {
+ UNIT_ASSERT(!batch->HasData());
+ return;
+ } else {
+ AFL_VERIFY(batch->GetColumnsCount() == numKeys + 2);
+ }
+
auto aggXOriginal = batch->GetArrayVerified(3);
auto aggYOriginal = batch->GetArrayVerified(4);
auto colXOriginal = batch->GetArrayVerified(1);
diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.h b/ydb/core/tx/columnshard/engines/predicate/filter.h
index 8de06ab4cf..d3074fb2bf 100644
--- a/ydb/core/tx/columnshard/engines/predicate/filter.h
+++ b/ydb/core/tx/columnshard/engines/predicate/filter.h
@@ -168,7 +168,6 @@ private:
}
virtual const std::shared_ptr<arrow::RecordBatch>& DoGetPKCursor() const override {
- AFL_VERIFY(!!PrimaryKey);
return PrimaryKey;
}
@@ -242,7 +241,7 @@ private:
if (SourceId != entity.GetEntityId()) {
return false;
}
- AFL_VERIFY(RecordIndex <= entity.GetEntityRecordsCount());
+ AFL_VERIFY(RecordIndex <= entity.GetEntityRecordsCount())("index", RecordIndex)("count", entity.GetEntityRecordsCount());
usage = RecordIndex < entity.GetEntityRecordsCount();
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h b/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h
index 37ba57b899..6968fd1be7 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h
@@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NReader {
class TScanIteratorBase {
protected:
- virtual void DoOnSentDataFromInterval(const ui32 /*intervalIdx*/) const {
+ virtual void DoOnSentDataFromInterval(const TPartialSourceAddress& /*intervalAddress*/) {
}
public:
@@ -21,9 +21,9 @@ public:
virtual const TReadStats& GetStats() const;
- void OnSentDataFromInterval(const std::optional<ui32> intervalIdx) const {
- if (intervalIdx) {
- DoOnSentDataFromInterval(*intervalIdx);
+ void OnSentDataFromInterval(const std::optional<TPartialSourceAddress>& intervalAddress) {
+ if (intervalAddress) {
+ DoOnSentDataFromInterval(*intervalAddress);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
index 22bb1ce139..c9a63b0bf5 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
@@ -13,6 +13,7 @@
namespace NKikimr::NOlap::NReader {
class TPartialReadResult;
+class TPartialSourceAddress;
class TComputeShardingPolicy {
private:
@@ -170,7 +171,7 @@ public:
Started = true;
return DoStart();
}
- virtual void OnSentDataFromInterval(const ui32 intervalIdx) const = 0;
+ virtual void OnSentDataFromInterval(const TPartialSourceAddress& address) = 0;
const TReadContext& GetContext() const {
return *Context;
diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
index c424c84b9a..f847b6d177 100644
--- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
@@ -265,7 +265,7 @@ bool TColumnShardScan::ProduceResults() noexcept {
}
Result->LastCursorProto = CurrentLastReadKey->SerializeToProto();
SendResult(false, false);
- ScanIterator->OnSentDataFromInterval(result.GetNotFinishedIntervalIdx());
+ ScanIterator->OnSentDataFromInterval(result.GetNotFinishedInterval());
ACFL_DEBUG("stage", "finished")("iterator", ScanIterator->DebugString());
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.cpp b/ydb/core/tx/columnshard/engines/reader/common/result.cpp
index 92f55f3dfc..38805a9982 100644
--- a/ydb/core/tx/columnshard/engines/reader/common/result.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common/result.cpp
@@ -55,12 +55,12 @@ std::vector<std::shared_ptr<TPartialReadResult>> TPartialReadResult::SplitResult
TPartialReadResult::TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
- const std::optional<ui32> notFinishedIntervalIdx)
+ const std::optional<TPartialSourceAddress> notFinishedInterval)
: ResourceGuards(resourceGuards)
, GroupGuard(gGuard)
, ResultBatch(batch)
, ScanCursor(scanCursor)
- , NotFinishedIntervalIdx(notFinishedIntervalIdx)
+ , NotFinishedInterval(notFinishedInterval)
, Guard(TValidator::CheckNotNull(context)->GetCounters().GetResultsForReplyGuard()) {
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
Y_ABORT_UNLESS(ScanCursor);
diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.h b/ydb/core/tx/columnshard/engines/reader/common/result.h
index f4e7d7d4b1..6fff2c8608 100644
--- a/ydb/core/tx/columnshard/engines/reader/common/result.h
+++ b/ydb/core/tx/columnshard/engines/reader/common/result.h
@@ -12,6 +12,22 @@ namespace NKikimr::NOlap::NReader {
class TReadContext;
+class TPartialSourceAddress {
+private:
+ YDB_READONLY(ui32, SourceId, 0);
+ YDB_READONLY(ui32, SourceIdx, 0);
+ YDB_READONLY(ui32, SyncPointIndex, 0);
+
+public:
+ TPartialSourceAddress(const ui32 sourceId, const ui32 sourceIdx, const ui32 syncPointIndex)
+ : SourceId(sourceId)
+ , SourceIdx(sourceIdx)
+ , SyncPointIndex(syncPointIndex)
+ {
+
+ }
+};
+
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
class TPartialReadResult: public TNonCopyable {
private:
@@ -22,7 +38,7 @@ private:
// This 1-row batch contains the last key that was read while producing the ResultBatch.
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
std::shared_ptr<IScanCursor> ScanCursor;
- YDB_READONLY_DEF(std::optional<ui32>, NotFinishedIntervalIdx);
+ YDB_READONLY_DEF(std::optional<TPartialSourceAddress>, NotFinishedInterval);
const NColumnShard::TCounterGuard Guard;
public:
@@ -61,11 +77,11 @@ public:
explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
- const std::optional<ui32> notFinishedIntervalIdx);
+ const std::optional<TPartialSourceAddress> notFinishedInterval);
explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
- const std::shared_ptr<TReadContext>& context, const std::optional<ui32> notFinishedIntervalIdx)
- : TPartialReadResult({}, nullptr, batch, scanCursor, context, notFinishedIntervalIdx) {
+ const std::shared_ptr<TReadContext>& context, const std::optional<TPartialSourceAddress> notFinishedInterval)
+ : TPartialReadResult({}, nullptr, batch, scanCursor, context, notFinishedInterval) {
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp
index 800ef6841e..6d40278016 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp
@@ -10,7 +10,7 @@ void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSu
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, Source->AddEvent("fbf"));
Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData()));
AFL_VERIFY(Step.Next());
- auto task = std::make_shared<TStepAction>(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId(), false);
NConveyor::TScanServiceOperator::SendTaskToExecute(task, Context->GetCommonContext()->GetConveyorProcessId());
}
@@ -54,7 +54,7 @@ void TColumnsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::N
for (auto&& i : DataFetchers) {
Source->MutableStageData().AddFetcher(i.second);
}
- auto task = std::make_shared<TStepAction>(Source, std::move(Cursor), Source->GetContext()->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(Source, std::move(Cursor), Source->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyor::TScanServiceOperator::SendTaskToExecute(task, Source->GetContext()->GetCommonContext()->GetConveyorProcessId());
} else {
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, Source->AddEvent("cf_next"));
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp
index 51bfe780e3..6f9219bc9f 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp
@@ -55,7 +55,7 @@ bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr
}
Step.Next();
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, data->AddEvent("fmalloc"));
- auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyor::TScanServiceOperator::SendTaskToExecute(task, data->GetContext()->GetCommonContext()->GetConveyorProcessId());
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h
index 6551b39c27..e80f376d79 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h
@@ -149,8 +149,8 @@ public:
}
}
- bool IsEmptyFiltered() const {
- return Table->IsEmptyFiltered();
+ bool IsEmptyWithData() const {
+ return Table->HasDataAndResultIsEmpty();
}
void Clear() {
@@ -177,12 +177,35 @@ public:
}
};
+class TSourceChunkToReply {
+private:
+ YDB_READONLY(ui32, StartIndex, 0);
+ YDB_READONLY(ui32, RecordsCount, 0);
+ std::shared_ptr<arrow::Table> Table;
+
+public:
+ const std::shared_ptr<arrow::Table>& GetTable() const {
+ AFL_VERIFY(Table);
+ return Table;
+ }
+
+ bool HasData() const {
+ return !!Table && Table->num_rows();
+ }
+
+ TSourceChunkToReply(const ui32 startIndex, const ui32 recordsCount, const std::shared_ptr<arrow::Table>& table)
+ : StartIndex(startIndex)
+ , RecordsCount(recordsCount)
+ , Table(table) {
+ }
+};
+
class TFetchedResult {
private:
YDB_READONLY_DEF(std::shared_ptr<NArrow::TGeneralContainer>, Batch);
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedFilter);
std::optional<std::deque<TPortionDataAccessor::TReadPage>> PagesToResult;
- std::optional<std::shared_ptr<arrow::Table>> ChunkToReply;
+ std::optional<TSourceChunkToReply> ChunkToReply;
TFetchedResult() = default;
@@ -225,7 +248,7 @@ public:
AFL_VERIFY(page.GetIndexStart() == indexStart)("real", page.GetIndexStart())("expected", indexStart);
AFL_VERIFY(page.GetRecordsCount() == recordsCount)("real", page.GetRecordsCount())("expected", recordsCount);
AFL_VERIFY(!ChunkToReply);
- ChunkToReply = std::move(table);
+ ChunkToReply = TSourceChunkToReply(indexStart, recordsCount, std::move(table));
}
bool IsFinished() const {
@@ -236,7 +259,7 @@ public:
return !!ChunkToReply;
}
- std::shared_ptr<arrow::Table> ExtractResultChunk() {
+ std::optional<TSourceChunkToReply> ExtractResultChunk() {
AFL_VERIFY(!!ChunkToReply);
auto result = std::move(*ChunkToReply);
ChunkToReply.reset();
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
index f36ebb1a81..4319b0f944 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
@@ -15,6 +15,7 @@ namespace NKikimr::NOlap::NReader::NCommon {
bool TStepAction::DoApply(IDataReader& owner) const {
if (FinishedFlag) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply");
+ Source->StartSyncSection();
Source->OnSourceFetchingFinishedSafe(owner, Source);
}
return true;
@@ -35,11 +36,17 @@ TConclusionStatus TStepAction::DoExecuteImpl() {
return TConclusionStatus::Success();
}
-TStepAction::TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId)
+TStepAction::TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId,
+ const bool changeSyncSection)
: TBase(ownerActorId)
, Source(source)
, Cursor(std::move(cursor))
, CountersGuard(Source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) {
+ if (changeSyncSection) {
+ Source->StartAsyncSection();
+ } else {
+ Source->CheckAsyncSection();
+ }
}
TConclusion<bool> TFetchingScriptCursor::Execute(const std::shared_ptr<IDataSource>& source) {
@@ -48,14 +55,16 @@ TConclusion<bool> TFetchingScriptCursor::Execute(const std::shared_ptr<IDataSour
Script->OnExecute();
AFL_VERIFY(!Script->IsFinished(CurrentStepIdx));
while (!Script->IsFinished(CurrentStepIdx)) {
- if (source->HasStageData() && source->GetStageData().IsEmptyFiltered()) {
+ if (source->HasStageData() && source->GetStageData().IsEmptyWithData()) {
source->OnEmptyStageData(source);
break;
+ } else if (source->HasStageResult() && source->GetStageResult().IsEmpty()) {
+ break;
}
auto step = Script->GetStep(CurrentStepIdx);
TMemoryProfileGuard mGuard("SCAN_PROFILE::FETCHING::" + step->GetName() + "::" + Script->GetBranchName(),
IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx)("source_id", source->GetSourceId());
const TMonotonic startInstant = TMonotonic::Now();
const TConclusion<bool> resultStep = step->ExecuteInplace(source, *this);
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
index 952c3cda3a..2c5ff6b50d 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
@@ -331,10 +331,11 @@ public:
}
template <class T>
- TStepAction(const std::shared_ptr<T>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId)
- : TStepAction(std::static_pointer_cast<IDataSource>(source), std::move(cursor), ownerActorId) {
+ TStepAction(const std::shared_ptr<T>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId, const bool changeSyncSection)
+ : TStepAction(std::static_pointer_cast<IDataSource>(source), std::move(cursor), ownerActorId, changeSyncSection) {
}
- TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId);
+ TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId,
+ const bool changeSyncSection);
};
class TProgramStep: public IFetchingStep {
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp
index 609773c897..21a7e016ba 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp
@@ -24,8 +24,8 @@ TConclusion<bool> TColumnShardScanIterator::ReadNextInterval() {
return IndexedData->ReadNextInterval();
}
-void TColumnShardScanIterator::DoOnSentDataFromInterval(const ui32 intervalIdx) const {
- return IndexedData->OnSentDataFromInterval(intervalIdx);
+void TColumnShardScanIterator::DoOnSentDataFromInterval(const TPartialSourceAddress& address) {
+ return IndexedData->OnSentDataFromInterval(address);
}
TColumnShardScanIterator::~TColumnShardScanIterator() {
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h
index cd570090aa..f525af31b9 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h
@@ -55,7 +55,7 @@ public:
class TColumnShardScanIterator: public TScanIteratorBase {
private:
- virtual void DoOnSentDataFromInterval(const ui32 intervalIdx) const override;
+ virtual void DoOnSentDataFromInterval(const TPartialSourceAddress& address) override;
protected:
ui64 ItemsRead = 0;
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h
index c8c62b09af..afdc0b4e36 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h
@@ -116,6 +116,7 @@ public:
class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource {
private:
+ TAtomic SyncSectionFlag = 1;
YDB_READONLY(ui64, SourceId, 0);
YDB_READONLY(ui32, SourceIdx, 0);
YDB_READONLY(TSnapshot, RecordSnapshotMin, TSnapshot::Zero());
@@ -161,6 +162,22 @@ protected:
std::unique_ptr<TFetchedResult> StageResult;
public:
+ void StartAsyncSection() {
+ AFL_VERIFY(AtomicCas(&SyncSectionFlag, 0, 1));
+ }
+
+ void CheckAsyncSection() {
+ AFL_VERIFY(AtomicGet(SyncSectionFlag) == 0);
+ }
+
+ void StartSyncSection() {
+ AFL_VERIFY(AtomicCas(&SyncSectionFlag, 1, 0));
+ }
+
+ bool IsSyncSection() const {
+ return AtomicGet(SyncSectionFlag) == 1;
+ }
+
void AddEvent(const TString& evDescription) {
AFL_VERIFY(!!Events);
Events->AddEvent(evDescription);
@@ -274,6 +291,10 @@ public:
return DoStartFetchingColumns(sourcePtr, step, columns);
}
+ void ResetSourceFinishedFlag() {
+ AFL_VERIFY(AtomicCas(&SourceFinishedSafeFlag, 0, 1));
+ }
+
void OnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr<IDataSource>& sourcePtr) {
AFL_VERIFY(AtomicCas(&SourceFinishedSafeFlag, 1, 0));
AFL_VERIFY(sourcePtr);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
index 8636058113..bb355defb1 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
@@ -87,7 +87,7 @@ TConclusion<bool> TDetectInMem::DoExecuteInplace(const std::shared_ptr<IDataSour
auto plan = source->GetContext()->GetColumnsFetchingPlan(source);
source->InitFetchingPlan(plan);
TFetchingScriptCursor cursor(plan, 0);
- auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
return false;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h
index 960f49541b..2b2f5e250f 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h
@@ -45,8 +45,8 @@ protected:
}
public:
- virtual void OnSentDataFromInterval(const ui32 intervalIdx) const override {
- Scanner->OnSentDataFromInterval(intervalIdx);
+ virtual void OnSentDataFromInterval(const TPartialSourceAddress& address) override {
+ Scanner->OnSentDataFromInterval(address);
}
template <class T>
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
index b0e489fa74..076dd63d49 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
@@ -21,10 +21,10 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result_received")("interval_idx", intervalIdx)(
"intervalId", itInterval->second->GetIntervalId());
if (newBatch && newBatch->GetRecordsCount()) {
- std::optional<ui32> callbackIdxSubscriver;
+ std::optional<TPartialSourceAddress> callbackIdxSubscriver;
std::shared_ptr<NGroupedMemoryManager::TGroupGuard> gGuard;
if (itInterval->second->HasMerger()) {
- callbackIdxSubscriver = intervalIdx;
+ callbackIdxSubscriver = TPartialSourceAddress(itInterval->second->GetIntervalId(), intervalIdx, 0);
} else {
gGuard = itInterval->second->GetGroupGuard();
}
@@ -203,4 +203,13 @@ void TScanHead::Abort() {
Y_ABORT_UNLESS(IsFinished());
}
+void TScanHead::OnSentDataFromInterval(const TPartialSourceAddress& address) const {
+ if (Context->IsAborted()) {
+ return;
+ }
+ auto it = FetchingIntervals.find(address.GetSourceIdx());
+ AFL_VERIFY(it != FetchingIntervals.end())("interval_idx", address.GetSourceIdx())("count", FetchingIntervals.size());
+ it->second->OnPartSendingComplete();
+}
+
} // namespace NKikimr::NOlap::NReader::NPlain
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h
index 09649e7881..23888d4108 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h
@@ -84,14 +84,7 @@ private:
void DrainSources();
[[nodiscard]] TConclusionStatus DetectSourcesFeatureInContextIntervalScan(const THashMap<ui32, std::shared_ptr<IDataSource>>& intervalSources, const bool isExclusiveInterval) const;
public:
- void OnSentDataFromInterval(const ui32 intervalIdx) const {
- if (Context->IsAborted()) {
- return;
- }
- auto it = FetchingIntervals.find(intervalIdx);
- AFL_VERIFY(it != FetchingIntervals.end())("interval_idx", intervalIdx)("count", FetchingIntervals.size());
- it->second->OnPartSendingComplete();
- }
+ void OnSentDataFromInterval(const TPartialSourceAddress& address) const;
bool IsReverse() const;
void Abort();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
index c0300c23ae..9f74bc79c6 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
@@ -36,7 +36,7 @@ void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::share
return;
}
TFetchingScriptCursor cursor(FetchingPlan, 0);
- auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId(), true);
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
}
@@ -170,7 +170,7 @@ private:
AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size());
Source->MutableStageData().SetPortionAccessor(std::move(result.ExtractPortionsVector().front()));
AFL_VERIFY(Step.Next());
- auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
@@ -219,7 +219,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>&
const ISnapshotSchema::TPtr batchSchema =
GetContext()->GetReadMetadata()->GetIndexVersions().GetSchemaVerified(GetCommitted().GetSchemaVersion());
const ISnapshotSchema::TPtr resultSchema = GetContext()->GetReadMetadata()->GetResultSchema();
- if (!GetStageData().GetTable()->HasAccessors()) {
+ if (!AssembledFlag) {
+ AssembledFlag = true;
AFL_VERIFY(GetStageData().GetBlobs().size() == 1);
auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion());
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
index 3823b0a1cc..69cce115df 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
@@ -308,6 +308,7 @@ private:
using TBase = IDataSource;
TCommittedBlob CommittedBlob;
bool ReadStarted = false;
+ bool AssembledFlag = false;
virtual void DoAbort() override {
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp
deleted file mode 100644
index 1fb5b3dd31..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.cpp
+++ /dev/null
@@ -1,115 +0,0 @@
-#include "collections.h"
-
-#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
-
-namespace NKikimr::NOlap::NReader::NSimple {
-
-std::shared_ptr<IDataSource> TScanWithLimitCollection::DoExtractNext() {
- AFL_VERIFY(HeapSources.size());
- std::pop_heap(HeapSources.begin(), HeapSources.end());
- auto result = HeapSources.back().Construct(Context);
- AFL_VERIFY(FetchingInFlightSources.emplace(TCompareKeyForScanSequence::FromFinish(result)).second);
- auto predPosition = std::move(HeapSources.back());
- HeapSources.pop_back();
- FetchingInFlightCount.Inc();
- return result;
-}
-
-void TScanWithLimitCollection::DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) {
- if (!source->GetResultRecordsCount() && InFlightLimit < GetMaxInFlight()) {
- InFlightLimit = 2 * InFlightLimit;
- }
- FetchingInFlightCount.Dec();
- AFL_VERIFY(FetchingInFlightSources.erase(TCompareKeyForScanSequence::FromFinish(source)));
- while (FinishedSources.size() && (HeapSources.empty() || FinishedSources.begin()->first < HeapSources.front().GetStart())) {
- auto finishedSource = FinishedSources.begin()->second;
- FetchedCount += finishedSource.GetRecordsCount();
- FinishedSources.erase(FinishedSources.begin());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource.GetSourceId())(
- "source_idx", finishedSource.GetSourceIdx())("limit", Limit)("fetched", finishedSource.GetRecordsCount());
- if (Limit <= FetchedCount && HeapSources.size()) {
- AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount);
- HeapSources.clear();
- }
- }
-}
-
-ui32 TScanWithLimitCollection::GetInFlightIntervalsCount(const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const {
- AFL_VERIFY(from < to);
- ui32 inFlightCountLocal = 0;
- {
- auto itFinishedFrom = FinishedSources.lower_bound(from);
- auto itFinishedTo = FinishedSources.lower_bound(to);
- for (auto&& it = itFinishedFrom; it != itFinishedTo; ++it) {
- ++inFlightCountLocal;
- }
- }
- {
- auto itFetchingFrom = FetchingInFlightSources.lower_bound(from);
- auto itFetchingTo = FetchingInFlightSources.lower_bound(to);
- for (auto&& it = itFetchingFrom; it != itFetchingTo; ++it) {
- ++inFlightCountLocal;
- }
- }
- return inFlightCountLocal;
-}
-
-TScanWithLimitCollection::TScanWithLimitCollection(
- const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, const std::shared_ptr<IScanCursor>& cursor)
- : TBase(context)
- , Limit((ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust()) {
- if (cursor && cursor->IsInitialized()) {
- for (auto&& i : sources) {
- bool usage = false;
- if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(i, usage)) {
- continue;
- }
- if (usage) {
- i.SetIsStartedByCursor();
- }
- break;
- }
- }
-
- HeapSources = std::move(sources);
- std::make_heap(HeapSources.begin(), HeapSources.end());
-}
-
-void TScanWithLimitCollection::DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) {
- std::vector<std::shared_ptr<arrow::ChunkedArray>> pkArrays;
- for (auto&& f : Context->GetReadMetadata()->GetResultSchema()->GetIndexInfo().GetReplaceKey()->fields()) {
- pkArrays.emplace_back(table->GetColumnByName(f->name()));
- if (!pkArrays.back()) {
- pkArrays.pop_back();
- break;
- }
- }
- AFL_VERIFY(pkArrays.size());
- const ui32 partsCount = std::min<ui32>(10, table->num_rows());
- std::optional<i32> lastPosition;
- for (ui32 i = 0; i < partsCount; ++i) {
- const i32 currentPosition = (i + 1) * (table->num_rows() - 1) / partsCount;
- if (lastPosition) {
- AFL_VERIFY(*lastPosition < currentPosition);
- }
- const i64 size = lastPosition ? (currentPosition - *lastPosition) : currentPosition;
- lastPosition = currentPosition;
- TReplaceKeyAdapter key(NArrow::TComparablePosition(pkArrays, currentPosition), Context->GetReadMetadata()->IsDescSorted());
- TCompareKeyForScanSequence finishPos(key, source->GetSourceId());
- AFL_VERIFY(FinishedSources.emplace(finishPos, TFinishedDataSource(source, size)).second);
- }
-}
-
-ISourcesCollection::ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context)
- : Context(context) {
- if (HasAppData() && AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
- MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
- }
-}
-
-std::shared_ptr<NKikimr::NOlap::IScanCursor> TNotSortedCollection::DoBuildCursor(
- const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const {
- return std::make_shared<TNotSortedSimpleScanCursor>(source->GetSourceId(), readyRecords);
-}
-
-} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h
deleted file mode 100644
index b497c2d761..0000000000
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections.h
+++ /dev/null
@@ -1,244 +0,0 @@
-#pragma once
-#include "context.h"
-#include "source.h"
-
-#include <ydb/library/accessor/positive_integer.h>
-
-namespace NKikimr::NOlap::NReader::NSimple {
-
-class ISourcesCollection {
-private:
- virtual bool DoIsFinished() const = 0;
- virtual std::shared_ptr<IDataSource> DoExtractNext() = 0;
- virtual bool DoCheckInFlightLimits() const = 0;
- virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) = 0;
- virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) = 0;
- virtual void DoClear() = 0;
-
- TPositiveControlInteger SourcesInFlightCount;
- YDB_READONLY(ui64, MaxInFlight, 1024);
-
- virtual TString DoDebugString() const {
- return "";
- }
- virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const = 0;
-
-protected:
- const std::shared_ptr<TSpecialReadContext> Context;
-
-public:
- std::shared_ptr<IScanCursor> BuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const {
- return DoBuildCursor(source, readyRecords);
- }
-
- void OnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) {
- return DoOnIntervalResult(table, source);
- }
-
- TString DebugString() const {
- return DoDebugString();
- }
-
- virtual ~ISourcesCollection() = default;
-
- std::shared_ptr<IDataSource> ExtractNext() {
- SourcesInFlightCount.Inc();
- return DoExtractNext();
- }
-
- bool IsFinished() const {
- return DoIsFinished();
- }
-
- void OnSourceFinished(const std::shared_ptr<IDataSource>& source) {
- AFL_VERIFY(source);
- SourcesInFlightCount.Dec();
- DoOnSourceFinished(source);
- }
-
- bool CheckInFlightLimits() const {
- return DoCheckInFlightLimits();
- }
-
- void Clear() {
- DoClear();
- }
-
- ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context);
-};
-
-class TNotSortedCollection: public ISourcesCollection {
-private:
- using TBase = ISourcesCollection;
- std::optional<ui32> Limit;
- ui32 InFlightLimit = 1;
- std::deque<TSourceConstructor> Sources;
- TPositiveControlInteger InFlightCount;
- ui32 FetchedCount = 0;
- virtual void DoClear() override {
- Sources.clear();
- }
- virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override;
- virtual bool DoIsFinished() const override {
- return Sources.empty();
- }
- virtual std::shared_ptr<IDataSource> DoExtractNext() override {
- AFL_VERIFY(Sources.size());
- auto result = Sources.front().Construct(Context);
- Sources.pop_front();
- InFlightCount.Inc();
- return result;
- }
- virtual bool DoCheckInFlightLimits() const override {
- return InFlightCount < InFlightLimit;
- }
- virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& /*table*/, const std::shared_ptr<IDataSource>& /*source*/) override {
- }
- virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override {
- if (!source->GetResultRecordsCount() && InFlightLimit * 2 < GetMaxInFlight()) {
- InFlightLimit *= 2;
- }
- FetchedCount += source->GetResultRecordsCount();
- if (Limit && *Limit <= FetchedCount && Sources.size()) {
- AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount);
- Sources.clear();
- }
- InFlightCount.Dec();
- }
-
-public:
- TNotSortedCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
- const std::shared_ptr<IScanCursor>& cursor, const std::optional<ui32> limit)
- : TBase(context)
- , Limit(limit) {
- if (Limit) {
- InFlightLimit = 1;
- } else {
- InFlightLimit = GetMaxInFlight();
- }
- if (cursor && cursor->IsInitialized()) {
- while (sources.size()) {
- bool usage = false;
- if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(sources.front(), usage)) {
- sources.pop_front();
- continue;
- }
- if (usage) {
- sources.front().SetIsStartedByCursor();
- }
- break;
- }
- }
- Sources = std::move(sources);
- }
-};
-
-class TSortedFullScanCollection: public ISourcesCollection {
-private:
- using TBase = ISourcesCollection;
- std::deque<TSourceConstructor> HeapSources;
- TPositiveControlInteger InFlightCount;
- virtual void DoClear() override {
- HeapSources.clear();
- }
- virtual bool DoIsFinished() const override {
- return HeapSources.empty();
- }
- virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
- return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords);
- }
- virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& /*table*/, const std::shared_ptr<IDataSource>& /*source*/) override {
- }
- virtual std::shared_ptr<IDataSource> DoExtractNext() override {
- AFL_VERIFY(HeapSources.size());
- auto result = HeapSources.front().Construct(Context);
- std::pop_heap(HeapSources.begin(), HeapSources.end());
- HeapSources.pop_back();
- InFlightCount.Inc();
- return result;
- }
- virtual bool DoCheckInFlightLimits() const override {
- return InFlightCount < GetMaxInFlight();
- }
- virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& /*source*/) override {
- InFlightCount.Dec();
- }
-
-public:
- TSortedFullScanCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
- const std::shared_ptr<IScanCursor>& cursor)
- : TBase(context) {
- if (cursor && cursor->IsInitialized()) {
- for (auto&& i : sources) {
- bool usage = false;
- if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(i, usage)) {
- continue;
- }
- if (usage) {
- i.SetIsStartedByCursor();
- }
- break;
- }
- }
- HeapSources = std::move(sources);
- std::make_heap(HeapSources.begin(), HeapSources.end());
- }
-};
-
-class TScanWithLimitCollection: public ISourcesCollection {
-private:
- using TBase = ISourcesCollection;
- class TFinishedDataSource {
- private:
- YDB_READONLY(ui32, RecordsCount, 0);
- YDB_READONLY(ui32, SourceId, 0);
- YDB_READONLY(ui32, SourceIdx, 0);
-
- public:
- TFinishedDataSource(const std::shared_ptr<IDataSource>& source)
- : RecordsCount(source->GetResultRecordsCount())
- , SourceId(source->GetSourceId())
- , SourceIdx(source->GetSourceIdx()) {
- }
-
- TFinishedDataSource(const std::shared_ptr<IDataSource>& source, const ui32 partSize)
- : RecordsCount(partSize)
- , SourceId(source->GetSourceId())
- , SourceIdx(source->GetSourceIdx()) {
- AFL_VERIFY(partSize < source->GetResultRecordsCount());
- }
- };
-
- std::deque<TSourceConstructor> HeapSources;
- TPositiveControlInteger FetchingInFlightCount;
- TPositiveControlInteger FullIntervalsFetchingCount;
- ui64 Limit = 0;
- ui64 InFlightLimit = 1;
- ui64 FetchedCount = 0;
- std::map<TCompareKeyForScanSequence, TFinishedDataSource> FinishedSources;
- std::set<TCompareKeyForScanSequence> FetchingInFlightSources;
-
- virtual void DoOnIntervalResult(const std::shared_ptr<arrow::Table>& table, const std::shared_ptr<IDataSource>& source) override;
- virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
- return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords);
- }
- virtual void DoClear() override {
- HeapSources.clear();
- }
- virtual bool DoIsFinished() const override {
- return HeapSources.empty();
- }
- virtual std::shared_ptr<IDataSource> DoExtractNext() override;
- virtual bool DoCheckInFlightLimits() const override {
- return (FetchingInFlightCount < InFlightLimit);
- //&&(FullIntervalsFetchingCount < InFlightLimit);
- }
- virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override;
- ui32 GetInFlightIntervalsCount(const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const;
-
-public:
- TScanWithLimitCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
- const std::shared_ptr<IScanCursor>& cursor);
-};
-
-} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.cpp
new file mode 100644
index 0000000000..d40f3ee3b2
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.cpp
@@ -0,0 +1,14 @@
+#include "abstract.h"
+
+#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+ISourcesCollection::ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context)
+ : Context(context) {
+ if (HasAppData() && AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
+ MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
+ }
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.h
new file mode 100644
index 0000000000..1c5f00d8dc
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.h
@@ -0,0 +1,77 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h>
+
+#include <ydb/library/accessor/positive_integer.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class ISourcesCollection {
+private:
+ virtual bool DoIsFinished() const = 0;
+ virtual std::shared_ptr<IDataSource> DoExtractNext() = 0;
+ virtual bool DoCheckInFlightLimits() const = 0;
+ virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) = 0;
+ virtual void DoClear() = 0;
+ virtual void DoAbort() = 0;
+
+ TPositiveControlInteger SourcesInFlightCount;
+ YDB_READONLY(ui64, MaxInFlight, 1024);
+
+ virtual TString DoDebugString() const {
+ return "";
+ }
+ virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const = 0;
+ virtual bool DoHasData() const = 0;
+
+protected:
+ const std::shared_ptr<TSpecialReadContext> Context;
+
+public:
+ bool HasData() const {
+ return DoHasData();
+ }
+
+ std::shared_ptr<IScanCursor> BuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const {
+ AFL_VERIFY(source);
+ AFL_VERIFY(readyRecords <= source->GetRecordsCount())("count", source->GetRecordsCount())("ready", readyRecords);
+ return DoBuildCursor(source, readyRecords);
+ }
+
+ TString DebugString() const {
+ return DoDebugString();
+ }
+
+ virtual ~ISourcesCollection() = default;
+
+ std::shared_ptr<IDataSource> ExtractNext() {
+ SourcesInFlightCount.Inc();
+ return DoExtractNext();
+ }
+
+ bool IsFinished() const {
+ return DoIsFinished();
+ }
+
+ void OnSourceFinished(const std::shared_ptr<IDataSource>& source) {
+ AFL_VERIFY(source);
+ SourcesInFlightCount.Dec();
+ DoOnSourceFinished(source);
+ }
+
+ bool CheckInFlightLimits() const {
+ return DoCheckInFlightLimits();
+ }
+
+ void Clear() {
+ DoClear();
+ }
+
+ void Abort() {
+ DoAbort();
+ }
+
+ ISourcesCollection(const std::shared_ptr<TSpecialReadContext>& context);
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.cpp
new file mode 100644
index 0000000000..a24f1158b6
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.cpp
@@ -0,0 +1,5 @@
+#include "full_scan_sorted.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.h
new file mode 100644
index 0000000000..9fe6771159
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/full_scan_sorted.h
@@ -0,0 +1,65 @@
+#pragma once
+#include "abstract.h"
+
+#include <ydb/library/accessor/positive_integer.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TSortedFullScanCollection: public ISourcesCollection {
+private:
+ using TBase = ISourcesCollection;
+ std::deque<TSourceConstructor> HeapSources;
+ TPositiveControlInteger InFlightCount;
+ ui32 SourceIdx = 0;
+ virtual void DoClear() override {
+ HeapSources.clear();
+ }
+ virtual bool DoHasData() const override {
+ return HeapSources.size();
+ }
+ virtual void DoAbort() override {
+ HeapSources.clear();
+ }
+ virtual bool DoIsFinished() const override {
+ return HeapSources.empty();
+ }
+ virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
+ return std::make_shared<TSimpleScanCursor>(source->GetStartPKRecordBatch(), source->GetSourceId(), readyRecords);
+ }
+ virtual std::shared_ptr<IDataSource> DoExtractNext() override {
+ AFL_VERIFY(HeapSources.size());
+ auto result = HeapSources.front().Construct(SourceIdx++, Context);
+ std::pop_heap(HeapSources.begin(), HeapSources.end());
+ HeapSources.pop_back();
+ InFlightCount.Inc();
+ return result;
+ }
+ virtual bool DoCheckInFlightLimits() const override {
+ return InFlightCount < GetMaxInFlight();
+ }
+ virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& /*source*/) override {
+ InFlightCount.Dec();
+ }
+
+public:
+ TSortedFullScanCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
+ const std::shared_ptr<IScanCursor>& cursor)
+ : TBase(context) {
+ if (cursor && cursor->IsInitialized()) {
+ for (auto&& i : sources) {
+ bool usage = false;
+ if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(i, usage)) {
+ continue;
+ }
+ if (usage) {
+ i.SetIsStartedByCursor();
+ }
+ break;
+ }
+ }
+ HeapSources = std::move(sources);
+ std::make_heap(HeapSources.begin(), HeapSources.end());
+ }
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.cpp
new file mode 100644
index 0000000000..33b2c050e5
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.cpp
@@ -0,0 +1,51 @@
+#include "limit_sorted.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+std::shared_ptr<IDataSource> TScanWithLimitCollection::DoExtractNext() {
+ AFL_VERIFY(HeapSources.size());
+ std::pop_heap(HeapSources.begin(), HeapSources.end());
+ auto result = NextSource ? NextSource : HeapSources.back().Construct(SourceIdxCurrent++, Context);
+ AFL_VERIFY(FetchingInFlightSources.emplace(result->GetSourceId()).second);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractNext")("source_id", result->GetSourceId());
+ HeapSources.pop_back();
+ if (HeapSources.size()) {
+ NextSource = HeapSources.front().Construct(SourceIdxCurrent++, Context);
+ } else {
+ NextSource = nullptr;
+ }
+ return result;
+}
+
+void TScanWithLimitCollection::DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoOnSourceFinished")("source_id", source->GetSourceId())("limit", Limit)(
+ "max", GetMaxInFlight())("in_flight_limit", InFlightLimit)("count", FetchingInFlightSources.size());
+ if (!source->GetResultRecordsCount() && InFlightLimit < GetMaxInFlight()) {
+ InFlightLimit = 2 * InFlightLimit;
+ }
+ AFL_VERIFY(Cleared || Aborted || FetchingInFlightSources.erase(source->GetSourceId()))("source_id", source->GetSourceId());
+}
+
+TScanWithLimitCollection::TScanWithLimitCollection(
+ const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources, const std::shared_ptr<IScanCursor>& cursor)
+ : TBase(context)
+ , Limit((ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust()) {
+ HeapSources = std::move(sources);
+ std::make_heap(HeapSources.begin(), HeapSources.end());
+ if (cursor && cursor->IsInitialized()) {
+ while (HeapSources.size()) {
+ bool usage = false;
+ if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(HeapSources.front(), usage)) {
+ std::pop_heap(HeapSources.begin(), HeapSources.end());
+ HeapSources.pop_back();
+ continue;
+ }
+ if (usage) {
+ HeapSources.front().SetIsStartedByCursor();
+ }
+ break;
+ }
+ }
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h
new file mode 100644
index 0000000000..e04b105964
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h
@@ -0,0 +1,79 @@
+#pragma once
+#include "abstract.h"
+
+#include <ydb/library/accessor/positive_integer.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TScanWithLimitCollection: public ISourcesCollection {
+private:
+ using TBase = ISourcesCollection;
+ class TFinishedDataSource {
+ private:
+ YDB_READONLY(ui32, RecordsCount, 0);
+ YDB_READONLY(ui32, SourceId, 0);
+ YDB_READONLY(ui32, SourceIdx, 0);
+
+ public:
+ TFinishedDataSource(const std::shared_ptr<IDataSource>& source)
+ : RecordsCount(source->GetResultRecordsCount())
+ , SourceId(source->GetSourceId())
+ , SourceIdx(source->GetSourceIdx()) {
+ }
+
+ TFinishedDataSource(const std::shared_ptr<IDataSource>& source, const ui32 partSize)
+ : RecordsCount(partSize)
+ , SourceId(source->GetSourceId())
+ , SourceIdx(source->GetSourceIdx()) {
+ AFL_VERIFY(partSize < source->GetResultRecordsCount());
+ }
+ };
+
+ virtual bool DoHasData() const override {
+ return HeapSources.size();
+ }
+ ui32 SourceIdxCurrent = 0;
+ std::shared_ptr<IDataSource> NextSource;
+ std::deque<TSourceConstructor> HeapSources;
+ ui64 Limit = 0;
+ ui64 InFlightLimit = 1;
+ std::set<ui32> FetchingInFlightSources;
+ bool Aborted = false;
+ bool Cleared = false;
+
+ void DrainToLimit();
+
+ virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override {
+ return std::make_shared<TSimpleScanCursor>(nullptr, source->GetSourceId(), readyRecords);
+ }
+ virtual void DoClear() override {
+ Cleared = true;
+ HeapSources.clear();
+ FetchingInFlightSources.clear();
+ }
+ virtual void DoAbort() override {
+ Aborted = true;
+ HeapSources.clear();
+ FetchingInFlightSources.clear();
+ }
+ virtual bool DoIsFinished() const override {
+ return HeapSources.empty() && FetchingInFlightSources.empty();
+ }
+ virtual std::shared_ptr<IDataSource> DoExtractNext() override;
+ virtual bool DoCheckInFlightLimits() const override {
+ return FetchingInFlightSources.size() < InFlightLimit;
+ }
+
+ virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override;
+ ui32 GetInFlightIntervalsCount(const TCompareKeyForScanSequence& from, const TCompareKeyForScanSequence& to) const;
+
+public:
+ const std::shared_ptr<IDataSource>& GetNextSource() const {
+ return NextSource;
+ }
+
+ TScanWithLimitCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
+ const std::shared_ptr<IScanCursor>& cursor);
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.cpp
new file mode 100644
index 0000000000..8962f02633
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.cpp
@@ -0,0 +1,10 @@
+#include "not_sorted.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+ std::shared_ptr<NKikimr::NOlap::IScanCursor> TNotSortedCollection::DoBuildCursor(
+ const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const {
+ return std::make_shared<TNotSortedSimpleScanCursor>(source->GetSourceId(), readyRecords);
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.h
new file mode 100644
index 0000000000..45b855d9db
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/not_sorted.h
@@ -0,0 +1,78 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TNotSortedCollection: public ISourcesCollection {
+private:
+ using TBase = ISourcesCollection;
+ std::optional<ui32> Limit;
+ ui32 InFlightLimit = 1;
+ std::deque<TSourceConstructor> Sources;
+ TPositiveControlInteger InFlightCount;
+ ui32 FetchedCount = 0;
+ ui32 SourceIdx = 0;
+ virtual bool DoHasData() const override {
+ return Sources.size();
+ }
+ virtual void DoClear() override {
+ Sources.clear();
+ }
+ virtual void DoAbort() override {
+ Sources.clear();
+ }
+
+ virtual std::shared_ptr<IScanCursor> DoBuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const override;
+ virtual bool DoIsFinished() const override {
+ return Sources.empty();
+ }
+ virtual std::shared_ptr<IDataSource> DoExtractNext() override {
+ AFL_VERIFY(Sources.size());
+ auto result = Sources.front().Construct(SourceIdx++, Context);
+ Sources.pop_front();
+ InFlightCount.Inc();
+ return result;
+ }
+ virtual bool DoCheckInFlightLimits() const override {
+ return InFlightCount < InFlightLimit;
+ }
+ virtual void DoOnSourceFinished(const std::shared_ptr<IDataSource>& source) override {
+ if (!source->GetResultRecordsCount() && InFlightLimit * 2 < GetMaxInFlight()) {
+ InFlightLimit *= 2;
+ }
+ FetchedCount += source->GetResultRecordsCount();
+ if (Limit && *Limit <= FetchedCount && Sources.size()) {
+ AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")("limit", Limit)("fetched", FetchedCount);
+ Sources.clear();
+ }
+ InFlightCount.Dec();
+ }
+
+public:
+ TNotSortedCollection(const std::shared_ptr<TSpecialReadContext>& context, std::deque<TSourceConstructor>&& sources,
+ const std::shared_ptr<IScanCursor>& cursor, const std::optional<ui32> limit)
+ : TBase(context)
+ , Limit(limit) {
+ if (Limit) {
+ InFlightLimit = 1;
+ } else {
+ InFlightLimit = GetMaxInFlight();
+ }
+ if (cursor && cursor->IsInitialized()) {
+ while (sources.size()) {
+ bool usage = false;
+ if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(sources.front(), usage)) {
+ sources.pop_front();
+ continue;
+ }
+ if (usage) {
+ sources.front().SetIsStartedByCursor();
+ }
+ break;
+ }
+ }
+ Sources = std::move(sources);
+ }
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/ya.make
new file mode 100644
index 0000000000..640e7bbbe8
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+ not_sorted.cpp
+ full_scan_sorted.cpp
+ limit_sorted.cpp
+)
+
+PEERDIR(
+ ydb/core/formats/arrow
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp
index 9481c3c026..7d82fa6723 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp
@@ -100,7 +100,7 @@ TConclusion<bool> TDetectInMem::DoExecuteInplace(const std::shared_ptr<IDataSour
source->InitFetchingPlan(plan);
TFetchingScriptCursor cursor(plan, 0);
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, source->AddEvent("sdmem"));
- auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
return false;
}
@@ -109,25 +109,19 @@ namespace {
class TApplySourceResult: public IDataTasksProcessor::ITask {
private:
using TBase = IDataTasksProcessor::ITask;
- YDB_READONLY_DEF(std::shared_ptr<arrow::Table>, Result);
YDB_READONLY_DEF(std::shared_ptr<IDataSource>, Source);
- YDB_READONLY(ui32, StartIndex, 0);
- YDB_READONLY(ui32, OriginalRecordsCount, 0);
NColumnShard::TCounterGuard Guard;
TFetchingScriptCursor Step;
public:
- TString GetTaskClassIdentifier() const override {
+ virtual TString GetTaskClassIdentifier() const override {
return "TApplySourceResult";
}
- TApplySourceResult(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& result, const ui32 startIndex,
- const ui32 originalRecordsCount, const TFetchingScriptCursor& step)
+ TApplySourceResult(
+ const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step)
: TBase(NActors::TActorId())
- , Result(result)
, Source(source)
- , StartIndex(startIndex)
- , OriginalRecordsCount(originalRecordsCount)
, Guard(source->GetContext()->GetCommonContext()->GetCounters().GetResultsForSourceGuard())
, Step(step) {
}
@@ -138,9 +132,9 @@ public:
}
virtual bool DoApply(IDataReader& indexedDataRead) const override {
auto* plainReader = static_cast<TPlainReadData*>(&indexedDataRead);
- auto resultCopy = Result;
Source->SetCursor(Step);
- plainReader->MutableScanner().OnSourceReady(Source, std::move(resultCopy), StartIndex, OriginalRecordsCount, *plainReader);
+ Source->StartSyncSection();
+ plainReader->MutableScanner().GetResultSyncPoint()->OnSourcePrepared(Source, *plainReader);
return true;
}
};
@@ -164,33 +158,47 @@ TConclusion<bool> TBuildResultStep::DoExecuteInplace(const std::shared_ptr<IData
resultBatch = nullptr;
}
}
-
+ const ui32 recordsCount = resultBatch ? resultBatch->num_rows() : 0;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TBuildResultStep")("source_id", source->GetSourceId())("count", recordsCount);
+ context->GetCommonContext()->GetCounters().OnSourceFinished(source->GetRecordsCount(), source->GetUsedRawBytes(), recordsCount);
+ source->MutableResultRecordsCount() += recordsCount;
+ if (!resultBatch || !resultBatch->num_rows()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("empty_source", source->DebugJson().GetStringRobust());
+ }
+ source->MutableStageResult().SetResultChunk(std::move(resultBatch), StartIndex, RecordsCount);
NActors::TActivationContext::AsActorContext().Send(context->GetCommonContext()->GetScanActorId(),
- new NColumnShard::TEvPrivate::TEvTaskProcessedResult(
- std::make_shared<TApplySourceResult>(source, std::move(resultBatch), StartIndex, RecordsCount, step)));
+ new NColumnShard::TEvPrivate::TEvTaskProcessedResult(std::make_shared<TApplySourceResult>(source, step)));
return false;
}
TConclusion<bool> TPrepareResultStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
- NCommon::TFetchingScriptBuilder acc(*source->GetContext());
+ const auto context = source->GetContext();
+ NCommon::TFetchingScriptBuilder acc(*context);
if (source->IsSourceInMemory()) {
AFL_VERIFY(source->GetStageResult().GetPagesToResultVerified().size() == 1);
}
+ AFL_VERIFY(!source->GetStageResult().IsEmpty());
for (auto&& i : source->GetStageResult().GetPagesToResultVerified()) {
- if (source->GetIsStartedByCursor() && !source->GetContext()->GetCommonContext()->GetScanCursor()->CheckSourceIntervalUsage(
+ if (source->GetIsStartedByCursor() && !context->GetCommonContext()->GetScanCursor()->CheckSourceIntervalUsage(
source->GetSourceId(), i.GetIndexStart(), i.GetRecordsCount())) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TPrepareResultStep_ResultStep_SKIP_CURSOR")("source_id", source->GetSourceId());
continue;
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TPrepareResultStep_ResultStep")("source_id", source->GetSourceId());
}
acc.AddStep(std::make_shared<TBuildResultStep>(i.GetIndexStart(), i.GetRecordsCount()));
}
auto plan = std::move(acc).Build();
AFL_VERIFY(!plan->IsFinished(0));
source->InitFetchingPlan(plan);
-
- TFetchingScriptCursor cursor(plan, 0);
- auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId());
- NConveyor::TScanServiceOperator::SendTaskToExecute(task);
- return false;
+ if (source->NeedFullAnswer()) {
+ TFetchingScriptCursor cursor(plan, 0);
+ auto task = std::make_shared<TStepAction>(source, std::move(cursor), context->GetCommonContext()->GetScanActorId(), false);
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ return false;
+ } else {
+ return true;
+ }
}
} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp
index ab4cc51d6c..898c6f92c5 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp
@@ -57,4 +57,11 @@ void TPlainReadData::OnIntervalResult(const std::shared_ptr<TPartialReadResult>&
PartialResults.emplace_back(result);
}
+void TPlainReadData::OnSentDataFromInterval(const TPartialSourceAddress& sourceAddress) {
+ if (!SpecialReadContext->IsActive()) {
+ return;
+ }
+ Scanner->GetSyncPoint(sourceAddress.GetSyncPointIndex())->Continue(sourceAddress, *this);
+}
+
} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h
index adfe861d63..19c8e14ace 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h
@@ -60,12 +60,7 @@ public:
TScanHead& MutableScanner() {
return *Scanner;
}
- virtual void OnSentDataFromInterval(const ui32 sourceIdx) const override {
- if (!SpecialReadContext->IsActive()) {
- return;
- }
- Scanner->ContinueSource(sourceIdx);
- }
+ virtual void OnSentDataFromInterval(const TPartialSourceAddress& sourceAddress) override;
void OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result);
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
index deecda0487..94602e9878 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
@@ -1,6 +1,12 @@
#include "plain_read_data.h"
#include "scanner.h"
+#include "collections/full_scan_sorted.h"
+#include "collections/limit_sorted.h"
+#include "collections/not_sorted.h"
+#include "sync_points/limit.h"
+#include "sync_points/result.h"
+
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>
@@ -8,59 +14,6 @@
namespace NKikimr::NOlap::NReader::NSimple {
-void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& tableExt, const ui32 startIndex,
- const ui32 recordsCount, TPlainReadData& reader) {
- FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, source->AddEvent("f"));
- AFL_DEBUG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG)("event_log", source->GetEventsReport())("count", FetchingSources.size());
- source->MutableResultRecordsCount() += tableExt ? tableExt->num_rows() : 0;
- if (!tableExt || !tableExt->num_rows()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("empty_source", source->DebugJson().GetStringRobust());
- }
- Context->GetCommonContext()->GetCounters().OnSourceFinished(
- source->GetRecordsCount(), source->GetUsedRawBytes(), tableExt ? tableExt->num_rows() : 0);
-
- source->MutableStageResult().SetResultChunk(std::move(tableExt), startIndex, recordsCount);
- while (FetchingSources.size()) {
- auto frontSource = FetchingSources.front();
- if (!frontSource->HasStageResult()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result")("source_id", frontSource->GetSourceId())(
- "source_idx", frontSource->GetSourceIdx());
- break;
- }
- if (!frontSource->GetStageResult().HasResultChunk()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result_chunk")("source_id", frontSource->GetSourceId())(
- "source_idx", frontSource->GetSourceIdx());
- break;
- }
- auto table = frontSource->MutableStageResult().ExtractResultChunk();
- const bool isFinished = frontSource->GetStageResult().IsFinished();
- std::optional<ui32> sourceIdxToContinue;
- if (!isFinished) {
- sourceIdxToContinue = frontSource->GetSourceIdx();
- }
- if (table && table->num_rows()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "has_result")("source_id", frontSource->GetSourceId())(
- "source_idx", frontSource->GetSourceIdx())("table", table->num_rows());
- auto cursor = SourcesCollection->BuildCursor(frontSource, startIndex + recordsCount);
- reader.OnIntervalResult(std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table,
- cursor, Context->GetCommonContext(), sourceIdxToContinue));
- SourcesCollection->OnIntervalResult(table, frontSource);
- } else if (sourceIdxToContinue) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
- "source_idx", frontSource->GetSourceIdx());
- ContinueSource(*sourceIdxToContinue);
- break;
- }
- if (!isFinished) {
- break;
- }
- AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
- FetchingSources.pop_front();
- frontSource->ClearResult();
- SourcesCollection->OnSourceFinished(frontSource);
- }
-}
-
TConclusionStatus TScanHead::Start() {
return TConclusionStatus::Success();
}
@@ -69,29 +22,30 @@ TScanHead::TScanHead(std::deque<TSourceConstructor>&& sources, const std::shared
: Context(context) {
if (Context->GetReadMetadata()->IsSorted()) {
if (Context->GetReadMetadata()->HasLimit()) {
- SourcesCollection =
- std::make_unique<TScanWithLimitCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
+ auto collection =
+ std::make_shared<TScanWithLimitCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
+ SourcesCollection = collection;
+ SyncPoints.emplace_back(std::make_shared<TSyncPointLimitControl>(
+ (ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust(), SyncPoints.size(), context, collection));
} else {
SourcesCollection =
- std::make_unique<TSortedFullScanCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
+ std::make_shared<TSortedFullScanCollection>(Context, std::move(sources), context->GetCommonContext()->GetScanCursor());
}
} else {
- SourcesCollection = std::make_unique<TNotSortedCollection>(
+ SourcesCollection = std::make_shared<TNotSortedCollection>(
Context, std::move(sources), context->GetCommonContext()->GetScanCursor(), Context->GetReadMetadata()->GetLimitRobustOptional());
}
+ SyncPoints.emplace_back(std::make_shared<TSyncPointResult>(SyncPoints.size(), context, SourcesCollection));
+ for (ui32 i = 0; i + 1 < SyncPoints.size(); ++i) {
+ SyncPoints[i]->SetNext(SyncPoints[i + 1]);
+ }
}
TConclusion<bool> TScanHead::BuildNextInterval() {
- if (!Context->IsActive()) {
- return false;
- }
bool changed = false;
- while (!SourcesCollection->IsFinished() && SourcesCollection->CheckInFlightLimits() && Context->IsActive()) {
+ while (SourcesCollection->HasData() && SourcesCollection->CheckInFlightLimits()) {
auto source = SourcesCollection->ExtractNext();
- source->InitFetchingPlan(Context->GetColumnsFetchingPlan(source));
- source->StartProcessing(source);
- FetchingSources.emplace_back(source);
- AFL_VERIFY(FetchingSourcesByIdx.emplace(source->GetSourceIdx(), source).second);
+ SyncPoints.front()->AddSource(source);
changed = true;
}
return changed;
@@ -107,16 +61,15 @@ bool TScanHead::IsReverse() const {
void TScanHead::Abort() {
AFL_VERIFY(!Context->IsActive());
- for (auto&& i : FetchingSources) {
+ for (auto&& i : SyncPoints) {
i->Abort();
}
- FetchingSources.clear();
- SourcesCollection->Clear();
+ SourcesCollection->Abort();
Y_ABORT_UNLESS(IsFinished());
}
TScanHead::~TScanHead() {
- AFL_VERIFY(!IntervalsInFlightCount || !Context->IsActive());
+ AFL_VERIFY(IsFinished() || !Context->IsActive());
}
} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h
index d38ebef6eb..c505433c14 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h
@@ -1,7 +1,9 @@
#pragma once
-#include "collections.h"
#include "source.h"
+#include "collections/abstract.h"
+#include "sync_points/abstract.h"
+
#include <ydb/core/formats/arrow/reader/position.h>
#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
@@ -14,27 +16,39 @@ class TPlainReadData;
class TScanHead {
private:
std::shared_ptr<TSpecialReadContext> Context;
- THashMap<ui64, std::shared_ptr<IDataSource>> FetchingSourcesByIdx;
- std::deque<std::shared_ptr<IDataSource>> FetchingSources;
- TPositiveControlInteger IntervalsInFlightCount;
- std::unique_ptr<ISourcesCollection> SourcesCollection;
-
- void StartNextSource(const std::shared_ptr<TPortionDataSource>& source);
+ std::shared_ptr<ISourcesCollection> SourcesCollection;
+ std::vector<std::shared_ptr<ISyncPoint>> SyncPoints;
public:
- ~TScanHead();
+ const std::shared_ptr<ISyncPoint>& GetResultSyncPoint() const {
+ return SyncPoints.back();
+ }
+
+ const std::shared_ptr<ISyncPoint>& GetSyncPoint(const ui32 index) const {
+ AFL_VERIFY(index < SyncPoints.size());
+ return SyncPoints[index];
+ }
- void ContinueSource(const ui32 sourceIdx) const {
- auto it = FetchingSourcesByIdx.find(sourceIdx);
- AFL_VERIFY(it != FetchingSourcesByIdx.end())("source_idx", sourceIdx)("count", FetchingSourcesByIdx.size());
- it->second->ContinueCursor(it->second);
+ ISourcesCollection& MutableSourcesCollection() const {
+ return *SourcesCollection;
}
+ const ISourcesCollection& GetSourcesCollection() const {
+ return *SourcesCollection;
+ }
+
+ ~TScanHead();
+
bool IsReverse() const;
void Abort();
bool IsFinished() const {
- return FetchingSources.empty() && SourcesCollection->IsFinished();
+ for (auto&& i : SyncPoints) {
+ if (!i->IsFinished()) {
+ return false;
+ }
+ }
+ return SourcesCollection->IsFinished();
}
const TReadContext& GetContext() const;
@@ -42,16 +56,14 @@ public:
TString DebugString() const {
TStringBuilder sb;
sb << "S:{" << SourcesCollection->DebugString() << "};";
- sb << "F:";
- for (auto&& i : FetchingSources) {
- sb << i->GetSourceId() << ";";
+ sb << "SP:[";
+ for (auto&& i : SyncPoints) {
+ sb << "{" << i->DebugString() << "};";
}
+ sb << "]";
return sb;
}
- void OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& table, const ui32 startIndex,
- const ui32 recordsCount, TPlainReadData& reader);
-
TConclusionStatus Start();
TScanHead(std::deque<TSourceConstructor>&& sources, const std::shared_ptr<TSpecialReadContext>& context);
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
index 9f4b04cddd..d7505745e4 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
@@ -26,40 +26,46 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch
}
void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr) {
- AFL_VERIFY(!ProcessingStarted);
- InitStageData(std::make_unique<TFetchedData>(
- GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations(), sourcePtr->GetRecordsCount()));
AFL_VERIFY(FetchingPlan);
- ProcessingStarted = true;
- SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
- GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
- SetMemoryGroupId(SourceGroupGuard->GetGroupId());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx());
- // NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
+ if (!ProcessingStarted) {
+ InitStageData(std::make_unique<TFetchedData>(
+ GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations(), sourcePtr->GetRecordsCount()));
+ ProcessingStarted = true;
+ SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
+ GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
+ SetMemoryGroupId(SourceGroupGuard->GetGroupId());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx());
+ // NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
+ }
TFetchingScriptCursor cursor(FetchingPlan, 0);
- auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId(), true);
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
void IDataSource::ContinueCursor(const std::shared_ptr<IDataSource>& sourcePtr) {
- AFL_VERIFY(!!ScriptCursor);
+ AFL_VERIFY(!!ScriptCursor)("source_id", GetSourceId());
if (ScriptCursor->Next()) {
- auto task = std::make_shared<TStepAction>(sourcePtr, std::move(*ScriptCursor), GetContext()->GetCommonContext()->GetScanActorId());
- NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("source_id", GetSourceId())("event", "ContinueCursor");
+ auto cursor = std::move(*ScriptCursor);
ScriptCursor.reset();
+ auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId(), true);
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ } else {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("source_id", GetSourceId())("event", "CannotContinueCursor");
}
}
void IDataSource::DoOnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr<NCommon::IDataSource>& sourcePtr) {
auto* plainReader = static_cast<TPlainReadData*>(&owner);
- plainReader->MutableScanner().OnSourceReady(std::static_pointer_cast<IDataSource>(sourcePtr), nullptr, 0, GetRecordsCount(), *plainReader);
+ auto sourceSimple = std::static_pointer_cast<IDataSource>(sourcePtr);
+ plainReader->MutableScanner().GetSyncPoint(sourceSimple->GetPurposeSyncPointIndex())->OnSourcePrepared(sourceSimple, *plainReader);
}
void IDataSource::DoOnEmptyStageData(const std::shared_ptr<NCommon::IDataSource>& /*sourcePtr*/) {
TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT_EMPTY", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
ResourceGuards.clear();
StageResult = TFetchedResult::BuildEmpty();
- StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) });
+ StageResult->SetPages({});
ClearStageData();
}
@@ -69,6 +75,7 @@ void IDataSource::DoBuildStageResult(const std::shared_ptr<NCommon::IDataSource>
void IDataSource::Finalize(const std::optional<ui64> memoryLimit) {
TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
+ AFL_VERIFY(!GetStageData().IsEmptyWithData());
if (memoryLimit && !IsSourceInMemory()) {
const auto accessor = GetStageData().GetPortionAccessor();
StageResult = std::make_unique<TFetchedResult>(ExtractStageData(), *GetContext()->GetCommonContext()->GetResolver());
@@ -77,6 +84,10 @@ void IDataSource::Finalize(const std::optional<ui64> memoryLimit) {
StageResult = std::make_unique<TFetchedResult>(ExtractStageData(), *GetContext()->GetCommonContext()->GetResolver());
StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) });
}
+ if (StageResult->IsEmpty()) {
+ StageResult = TFetchedResult::BuildEmpty();
+ StageResult->SetPages({});
+ }
ClearStageData();
}
@@ -374,7 +385,7 @@ private:
Source->MutableStageData().SetPortionAccessor(std::move(result.ExtractPortionsVector().front()));
Source->InitUsedRawBytes();
AFL_VERIFY(Step.Next());
- auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId());
+ auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
index 290a6f2205..22a888aff5 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
@@ -51,6 +51,10 @@ private:
NArrow::TComparablePosition Value;
public:
+ const NArrow::TComparablePosition& GetValue() const {
+ return Value;
+ }
+
TReplaceKeyAdapter(const NArrow::TReplaceKey& rk, const bool reverse)
: Reverse(reverse)
, Value(rk) {
@@ -138,6 +142,8 @@ private:
virtual void DoOnEmptyStageData(const std::shared_ptr<NCommon::IDataSource>& /*sourcePtr*/) override;
void Finalize(const std::optional<ui64> memoryLimit);
+ bool NeedFullAnswerFlag = true;
+ std::optional<ui32> PurposeSyncPointIndex;
protected:
std::optional<ui64> UsedRawBytes;
@@ -149,6 +155,33 @@ protected:
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) = 0;
public:
+ bool NeedFullAnswer() const {
+ return NeedFullAnswerFlag;
+ }
+
+ void SetNeedFullAnswer(const bool value) {
+ NeedFullAnswerFlag = value;
+ }
+
+ ui32 GetPurposeSyncPointIndex() const {
+ AFL_VERIFY(PurposeSyncPointIndex);
+ return *PurposeSyncPointIndex;
+ }
+
+ void ResetPurposeSyncPointIndex() {
+ AFL_VERIFY(PurposeSyncPointIndex);
+ PurposeSyncPointIndex.reset();
+ }
+
+ void SetPurposeSyncPointIndex(const ui32 value) {
+ if (!PurposeSyncPointIndex) {
+ AFL_VERIFY(value == 0);
+ } else {
+ AFL_VERIFY(*PurposeSyncPointIndex < value);
+ }
+ PurposeSyncPointIndex = value;
+ }
+
virtual void InitUsedRawBytes() = 0;
ui64 GetUsedRawBytes() const {
@@ -226,6 +259,9 @@ public:
virtual bool HasIndexes(const std::set<ui32>& indexIds) const = 0;
void InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetching);
+ bool HasFetchingPlan() const {
+ return !!FetchingPlan;
+ }
virtual ui64 GetIndexRawBytes(const std::set<ui32>& indexIds) const = 0;
@@ -470,10 +506,10 @@ public:
return item.Start < Start;
}
- std::shared_ptr<TPortionDataSource> Construct(const std::shared_ptr<TSpecialReadContext>& context) const {
+ std::shared_ptr<TPortionDataSource> Construct(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context) const {
const auto& portions = context->GetReadMetadata()->SelectInfo->Portions;
- AFL_VERIFY(PortionIdx < portions.size());
- auto result = std::make_shared<TPortionDataSource>(PortionIdx, portions[PortionIdx], context);
+ AFL_VERIFY(sourceIdx < portions.size());
+ auto result = std::make_shared<TPortionDataSource>(sourceIdx, portions[PortionIdx], context);
if (IsStartedByCursorFlag) {
result->SetIsStartedByCursor();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp
new file mode 100644
index 0000000000..9d840e4643
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp
@@ -0,0 +1,93 @@
+#include "abstract.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h>
+
+#include <util/string/builder.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+void ISyncPoint::OnSourcePrepared(const std::shared_ptr<IDataSource>& sourceInput, TPlainReadData& reader) {
+ const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("sync_point", GetPointName())("aborted", AbortFlag);
+ if (AbortFlag) {
+ FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, sourceInput->AddEvent("a" + GetShortPointName()));
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "sync_point_aborted")("source_id", sourceInput->GetSourceId());
+ return;
+ } else {
+ FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, sourceInput->AddEvent("f" + GetShortPointName()));
+ }
+ AFL_DEBUG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG)("event_log", sourceInput->GetEventsReport())("count", SourcesSequentially.size())(
+ "source_id", sourceInput->GetSourceId());
+ AFL_VERIFY(sourceInput->IsSyncSection())("source_id", sourceInput->GetSourceId());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "OnSourcePrepared")("source_id", sourceInput->GetSourceId());
+ while (SourcesSequentially.size() && IsSourcePrepared(SourcesSequentially.front())) {
+ auto source = SourcesSequentially.front();
+ switch (OnSourceReady(source, reader)) {
+ case ESourceAction::Finish: {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "finish_source")("source_id", source->GetSourceId());
+ reader.GetScanner().MutableSourcesCollection().OnSourceFinished(source);
+ SourcesSequentially.pop_front();
+ break;
+ }
+ case ESourceAction::ProvideNext: {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "provide_source")("source_id", source->GetSourceId());
+ if (Next) {
+ source->ResetSourceFinishedFlag();
+ Next->AddSource(source);
+ } else {
+ reader.GetScanner().MutableSourcesCollection().OnSourceFinished(source);
+ }
+ SourcesSequentially.pop_front();
+ break;
+ }
+ case ESourceAction::Wait: {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "wait_source")("source_id", source->GetSourceId());
+ return;
+ }
+ }
+ }
+}
+
+TString ISyncPoint::DebugString() const {
+ TStringBuilder sb;
+ sb << "{";
+ for (auto&& i : SourcesSequentially) {
+ sb << i->GetSourceId() << ",";
+ }
+ sb << "}";
+ return sb;
+}
+
+void ISyncPoint::Continue(const TPartialSourceAddress& continueAddress, TPlainReadData& /*reader*/) {
+ AFL_VERIFY(PointIndex == continueAddress.GetSyncPointIndex());
+ AFL_VERIFY(SourcesSequentially.size() && SourcesSequentially.front()->GetSourceId() == continueAddress.GetSourceId())("first_source_id",
+ SourcesSequentially.front()->GetSourceId())(
+ "continue_source_id", continueAddress.GetSourceId());
+ const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("sync_point", GetPointName())("event", "continue_source");
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("source_id", SourcesSequentially.front()->GetSourceId());
+ SourcesSequentially.front()->ContinueCursor(SourcesSequentially.front());
+}
+
+void ISyncPoint::AddSource(const std::shared_ptr<IDataSource>& source) {
+ const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("sync_point", GetPointName())("event", "add_source");
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("source_id", source->GetSourceId());
+ AFL_VERIFY(!AbortFlag);
+ source->SetPurposeSyncPointIndex(GetPointIndex());
+ if (Next) {
+ source->SetNeedFullAnswer(false);
+ }
+ AFL_VERIFY(!!source);
+ if (!LastSourceIdx) {
+ LastSourceIdx = source->GetSourceIdx();
+ } else {
+ AFL_VERIFY(*LastSourceIdx < source->GetSourceIdx());
+ }
+ LastSourceIdx = source->GetSourceIdx();
+ SourcesSequentially.emplace_back(source);
+ if (!source->HasFetchingPlan()) {
+ source->InitFetchingPlan(Context->GetColumnsFetchingPlan(source));
+ }
+ OnAddSource(source);
+ source->StartProcessing(source);
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h
new file mode 100644
index 0000000000..5ef3e4ede2
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h
@@ -0,0 +1,83 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/reader/common/result.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TPlainReadData;
+class ISourcesCollection;
+
+class ISyncPoint {
+public:
+ enum class ESourceAction {
+ Finish,
+ ProvideNext,
+ Wait
+ };
+
+private:
+ YDB_READONLY(ui32, PointIndex, 0);
+ YDB_READONLY_DEF(TString, PointName);
+ std::optional<ui32> LastSourceIdx;
+ virtual void OnAddSource(const std::shared_ptr<IDataSource>& /*source*/) {
+ }
+ virtual bool IsSourcePrepared(const std::shared_ptr<IDataSource>& source) const = 0;
+ virtual ESourceAction OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& reader) = 0;
+ virtual void DoAbort() = 0;
+ bool AbortFlag = false;
+
+protected:
+ const std::shared_ptr<TSpecialReadContext> Context;
+ const std::shared_ptr<ISourcesCollection> Collection;
+ std::shared_ptr<ISyncPoint> Next;
+ std::deque<std::shared_ptr<IDataSource>> SourcesSequentially;
+
+public:
+ virtual ~ISyncPoint() = default;
+
+ void Continue(const TPartialSourceAddress& continueAddress, TPlainReadData& reader);
+
+ TString DebugString() const;
+
+ void Abort() {
+ SourcesSequentially.clear();
+ if (!AbortFlag) {
+ AbortFlag = true;
+ DoAbort();
+ }
+ }
+
+ bool IsFinished() const {
+ return SourcesSequentially.empty();
+ }
+
+ void SetNext(const std::shared_ptr<ISyncPoint>& next) {
+ AFL_VERIFY(!Next);
+ Next = next;
+ }
+
+ TString GetShortPointName() const {
+ if (PointName.size() < 2) {
+ return PointName;
+ } else {
+ return PointName.substr(0, 2);
+ }
+ }
+
+ ISyncPoint(const ui32 pointIndex, const TString& pointName, const std::shared_ptr<TSpecialReadContext>& context,
+ const std::shared_ptr<ISourcesCollection>& collection)
+ : PointIndex(pointIndex)
+ , PointName(pointName)
+ , Context(context)
+ , Collection(collection) {
+ }
+
+ void AddSource(const std::shared_ptr<IDataSource>& source);
+
+ void OnSourcePrepared(const std::shared_ptr<IDataSource>& sourceInput, TPlainReadData& reader);
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp
new file mode 100644
index 0000000000..31c4977f35
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp
@@ -0,0 +1,85 @@
+#include "limit.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/limit_sorted.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+TSyncPointLimitControl::TSyncPointLimitControl(const ui32 limit, const ui32 pointIndex, const std::shared_ptr<TSpecialReadContext>& context,
+ const std::shared_ptr<TScanWithLimitCollection>& collection)
+ : TBase(pointIndex, "SYNC_LIMIT", context, collection)
+ , Limit(limit)
+ , Collection(collection) {
+ AFL_VERIFY(Collection);
+}
+
+bool TSyncPointLimitControl::DrainToLimit() {
+ std::optional<TSourceIterator> nextInHeap;
+ if (Collection->GetNextSource()) {
+ nextInHeap = TSourceIterator(Collection->GetNextSource());
+ }
+ if (Iterators.empty() || (nextInHeap && Iterators.front() < *nextInHeap)) {
+ return false;
+ }
+
+ while (Iterators.size()) {
+ if (!Iterators.front().IsFilled()) {
+ return false;
+ }
+ std::pop_heap(Iterators.begin(), Iterators.end());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "LimitIteratorNext")("source_id", Iterators.back().GetSourceId())(
+ "fetched", FetchedCount)("limit", Limit)("iterators", Iterators.size());
+ if (!Iterators.back().Next()) {
+ Iterators.pop_back();
+ } else {
+ std::push_heap(Iterators.begin(), Iterators.end());
+ if (++FetchedCount >= Limit) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& /*reader*/) {
+ if (FetchedCount >= Limit) {
+ return ESourceAction::Finish;
+ }
+ const auto& rk = *source->GetSourceSchema()->GetIndexInfo().GetReplaceKey();
+ const auto& g = source->GetStageResult().GetBatch();
+ AFL_VERIFY(Iterators.size());
+ AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId());
+ std::pop_heap(Iterators.begin(), Iterators.end());
+ if (!g || !g->GetRecordsCount()) {
+ Iterators.pop_back();
+ } else {
+ std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> arrs;
+ for (auto&& i : rk.fields()) {
+ auto acc = g->GetAccessorByNameOptional(i->name());
+ if (!acc) {
+ break;
+ }
+ arrs.emplace_back(acc);
+ }
+ AFL_VERIFY(arrs.size());
+ if (!PKPrefixSize) {
+ PKPrefixSize = arrs.size();
+ } else {
+ AFL_VERIFY(*PKPrefixSize == arrs.size())("prefix", PKPrefixSize)("arr", arrs.size());
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoOnSourceCheckLimitFillIterator")("source_id", source->GetSourceId())(
+ "fetched", FetchedCount)("limit", Limit);
+ Iterators.back() = TSourceIterator(arrs, source->GetStageResult().GetNotAppliedFilter(), source);
+ AFL_VERIFY(Iterators.back().IsFilled());
+ std::push_heap(Iterators.begin(), Iterators.end());
+ }
+ if (DrainToLimit()) {
+ Collection->Clear();
+ }
+ if (source->GetStageResult().IsEmpty()) {
+ return ESourceAction::Finish;
+ } else {
+ return ESourceAction::ProvideNext;
+ }
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h
new file mode 100644
index 0000000000..b18582345b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h
@@ -0,0 +1,140 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TScanWithLimitCollection;
+
+class TSyncPointLimitControl: public ISyncPoint {
+private:
+ using TBase = ISyncPoint;
+
+ const ui32 Limit;
+ std::shared_ptr<TScanWithLimitCollection> Collection;
+ ui32 FetchedCount = 0;
+ std::optional<ui32> PKPrefixSize;
+
+ virtual bool IsSourcePrepared(const std::shared_ptr<IDataSource>& source) const override {
+ if (source->IsSyncSection() && source->HasStageResult()) {
+ AFL_VERIFY(!source->GetStageResult().HasResultChunk());
+ return true;
+ }
+ return false;
+ }
+ class TSourceIterator {
+ private:
+ std::shared_ptr<IDataSource> Source;
+ bool Reverse;
+ int Delta = 0;
+ i64 Start = 0;
+ i64 Finish = 0;
+ std::shared_ptr<NArrow::NMerger::TRWSortableBatchPosition> SortableRecord;
+ std::shared_ptr<NArrow::TColumnFilter> Filter;
+ std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;
+ bool IsValidFlag = true;
+
+ bool ShiftWithFilter() const {
+ AFL_VERIFY(IsValidFlag);
+ while (!FilterIterator->GetCurrentAcceptance()) {
+ if (!FilterIterator->Next(1)) {
+ AFL_VERIFY(!SortableRecord->NextPosition(Delta));
+ return false;
+ } else {
+ AFL_VERIFY(SortableRecord->NextPosition(Delta));
+ }
+ }
+ return true;
+ }
+
+ public:
+ const std::shared_ptr<IDataSource>& GetSource() const {
+ AFL_VERIFY(Source);
+ return Source;
+ }
+
+ TSourceIterator(const std::shared_ptr<IDataSource>& source)
+ : Source(source)
+ , Reverse(Source->GetContext()->GetReadMetadata()->IsDescSorted())
+ , Delta(Reverse ? -1 : 1) {
+ AFL_VERIFY(Source);
+ auto arr = Source->GetStart().GetValue().GetArrays();
+ auto batch =
+ arrow::RecordBatch::Make(Source->GetSourceSchema()->GetIndexInfo().GetReplaceKey(), arr.front()->length(), std::move(arr));
+ SortableRecord =
+ std::make_shared<NArrow::NMerger::TRWSortableBatchPosition>(batch, Source->GetStart().GetValue().GetMonoPosition(), Reverse);
+ }
+
+ TSourceIterator(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& arrs,
+ const std::shared_ptr<NArrow::TColumnFilter>& filter, const std::shared_ptr<IDataSource>& source)
+ : Source(source)
+ , Reverse(Source->GetContext()->GetReadMetadata()->IsDescSorted())
+ , Delta(Reverse ? -1 : 1)
+ , Start(Reverse ? (arrs.front()->GetRecordsCount() - 1) : 0)
+ , Finish(Reverse ? 0 : (arrs.front()->GetRecordsCount() - 1))
+ , Filter(filter ? filter : std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter())) {
+ AFL_VERIFY(arrs.size());
+ AFL_VERIFY(arrs.front()->GetRecordsCount());
+ FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(Reverse, arrs.front()->GetRecordsCount()));
+ auto prefixSchema = Source->GetSourceSchema()->GetIndexInfo().GetReplaceKeyPrefix(arrs.size());
+ auto copyArrs = arrs;
+ auto batch = std::make_shared<NArrow::TGeneralContainer>(prefixSchema->fields(), std::move(copyArrs));
+ SortableRecord = std::make_shared<NArrow::NMerger::TRWSortableBatchPosition>(batch, Start, Reverse);
+ IsValidFlag = ShiftWithFilter();
+ }
+
+ ui64 GetSourceId() const {
+ AFL_VERIFY(Source);
+ return Source->GetSourceId();
+ }
+
+ bool IsFilled() const {
+ return !!Filter;
+ }
+
+ bool IsValid() const {
+ return IsValidFlag;
+ }
+
+ bool Next() {
+ AFL_VERIFY(IsValidFlag);
+ AFL_VERIFY(!!SortableRecord);
+ AFL_VERIFY(!!Filter);
+ IsValidFlag = SortableRecord->NextPosition(Delta);
+ AFL_VERIFY(FilterIterator->Next(1) == IsValidFlag);
+ if (IsValidFlag) {
+ IsValidFlag = ShiftWithFilter();
+ }
+ return IsValidFlag;
+ }
+
+ bool operator<(const TSourceIterator& item) const {
+ const auto cmp = SortableRecord->ComparePartial(*item.SortableRecord);
+ if (cmp == std::partial_ordering::equivalent) {
+ return item.Source->GetSourceId() < Source->GetSourceId();
+ }
+ return cmp == std::partial_ordering::greater;
+ }
+ };
+
+ std::vector<TSourceIterator> Iterators;
+
+ virtual void OnAddSource(const std::shared_ptr<IDataSource>& source) override {
+ AFL_VERIFY(FetchedCount < Limit);
+ Iterators.emplace_back(TSourceIterator(source));
+ std::push_heap(Iterators.begin(), Iterators.end());
+ }
+
+ virtual void DoAbort() override {
+ Iterators.clear();
+ }
+
+ virtual ESourceAction OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& reader) override;
+
+ bool DrainToLimit();
+
+public:
+ TSyncPointLimitControl(const ui32 limit, const ui32 pointIndex, const std::shared_ptr<TSpecialReadContext>& context,
+ const std::shared_ptr<TScanWithLimitCollection>& collection);
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp
new file mode 100644
index 0000000000..6d301e9482
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp
@@ -0,0 +1,35 @@
+#include "result.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+ISyncPoint::ESourceAction TSyncPointResult::OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& reader) {
+ if (source->GetStageResult().IsEmpty()) {
+ return ESourceAction::Finish;
+ }
+ auto resultChunk = source->MutableStageResult().ExtractResultChunk();
+ const bool isFinished = source->GetStageResult().IsFinished();
+ if (resultChunk && resultChunk->HasData()) {
+ std::optional<TPartialSourceAddress> partialSourceAddress;
+ if (!isFinished) {
+ partialSourceAddress = TPartialSourceAddress(source->GetSourceId(), source->GetSourceIdx(), GetPointIndex());
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "has_result")("source_id", source->GetSourceId())(
+ "source_idx", source->GetSourceIdx())("table", resultChunk->GetTable()->num_rows())("is_finished", isFinished);
+ auto cursor = Collection->BuildCursor(source, resultChunk->GetStartIndex() + resultChunk->GetRecordsCount());
+ reader.OnIntervalResult(std::make_shared<TPartialReadResult>(source->GetResourceGuards(), source->GetGroupGuard(),
+ resultChunk->GetTable(), cursor, Context->GetCommonContext(), partialSourceAddress));
+ } else if (!isFinished) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "continue_source")("source_id", source->GetSourceId())(
+ "source_idx", source->GetSourceIdx());
+ source->ContinueCursor(source);
+ }
+ if (!isFinished) {
+ return ESourceAction::Wait;
+ }
+ source->ClearResult();
+ return ESourceAction::ProvideNext;
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.h
new file mode 100644
index 0000000000..8df2879d93
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.h
@@ -0,0 +1,25 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TSyncPointResult: public ISyncPoint {
+private:
+ using TBase = ISyncPoint;
+ virtual void DoAbort() override {
+ }
+
+ virtual ESourceAction OnSourceReady(const std::shared_ptr<IDataSource>& source, TPlainReadData& reader) override;
+ virtual bool IsSourcePrepared(const std::shared_ptr<IDataSource>& source) const override {
+ return source->IsSyncSection() && source->HasStageResult() &&
+ (source->GetStageResult().HasResultChunk() || source->GetStageResult().IsEmpty());
+ }
+
+public:
+ TSyncPointResult(
+ const ui32 pointIndex, const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<ISourcesCollection>& collection)
+ : TBase(pointIndex, "RESULT", context, collection) {
+ }
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/ya.make
new file mode 100644
index 0000000000..58d66933ef
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+ result.cpp
+ limit.cpp
+)
+
+PEERDIR(
+ ydb/core/formats/arrow
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make
index 3917d8d913..baff60941f 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make
@@ -8,13 +8,14 @@ SRCS(
context.cpp
fetching.cpp
iterator.cpp
- collections.cpp
)
PEERDIR(
ydb/core/formats/arrow
ydb/core/tx/columnshard/blobs_action
ydb/core/tx/columnshard/engines/reader/common_reader/iterator
+ ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections
+ ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points
ydb/core/tx/conveyor/usage
ydb/core/tx/limiter/grouped_memory/usage
)
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 92574ef1a2..7fc0cce69a 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -382,6 +382,16 @@ public:
return PKColumnIds[0];
}
+ std::shared_ptr<arrow::Schema> GetReplaceKeyPrefix(const ui32 size) const {
+ AFL_VERIFY(size);
+ AFL_VERIFY(size <= (ui32)PrimaryKey->num_fields());
+ if (size == (ui32)PrimaryKey->num_fields()) {
+ return PrimaryKey;
+ } else {
+ std::vector<std::shared_ptr<arrow::Field>> fields(PrimaryKey->fields().begin(), PrimaryKey->fields().begin() + size);
+ return std::make_shared<arrow::Schema>(std::move(fields));
+ }
+ }
const std::shared_ptr<arrow::Schema>& GetReplaceKey() const {
return PrimaryKey;
}
diff --git a/ydb/library/formats/arrow/replace_key.h b/ydb/library/formats/arrow/replace_key.h
index 5e8de57478..c4fe75d21d 100644
--- a/ydb/library/formats/arrow/replace_key.h
+++ b/ydb/library/formats/arrow/replace_key.h
@@ -245,6 +245,23 @@ private:
std::vector<ui32> Positions;
public:
+ ui32 GetMonoPosition() const {
+ std::optional<ui32> result;
+ for (auto&& i : Positions) {
+ if (!result) {
+ result = i;
+ } else {
+ AFL_VERIFY(*result == i);
+ }
+ }
+ AFL_VERIFY(result);
+ return *result;
+ }
+
+ const std::vector<std::shared_ptr<arrow::Array>>& GetArrays() const {
+ return Arrays;
+ }
+
TComparablePosition(const TReplaceKey& key)
: Arrays(*key.GetColumns())
, Positions(Arrays.size(), key.GetPosition()) {