aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-14 21:13:44 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-14 21:30:51 +0300
commit306a5ee7310d56c5b809e91bad189e67836f5875 (patch)
treedfe4c927199fe5c135b1281379f58556c033e549
parentb8a327d16e258b2d459ec0c25fe08cc456b066b6 (diff)
downloadydb-306a5ee7310d56c5b809e91bad189e67836f5875.tar.gz
KIKIMR-19216: improve merging
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp70
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h176
5 files changed, 156 insertions, 107 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index 3eb05ff1e0..b5a83d14bd 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -60,9 +60,9 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), portionRecordIndexField, column->BuildArray(batch->num_rows())));
}
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey()));
- mergeStream.AddPoolSource({}, batch, nullptr);
+ mergeStream.AddSource(batch, nullptr);
}
- batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields, true);
+ batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields);
}
Y_ABORT_UNLESS(batchResults.size());
@@ -208,11 +208,8 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter
return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL;
}
-void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position) {
- if (CheckPoints.size()) {
- AFL_VERIFY(CheckPoints.back().Compare(position) == std::partial_ordering::less);
- }
- CheckPoints.emplace_back(position);
+void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include) {
+ AFL_VERIFY(CheckPoints.emplace(position, include).second);
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
index 2a5309646c..1ecd56e5a5 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
@@ -8,7 +8,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
private:
using TBase = TCompactColumnEngineChanges;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
- std::vector<NIndexedReader::TSortableBatchPosition> CheckPoints;
+ std::map<NIndexedReader::TSortableBatchPosition, bool> CheckPoints;
protected:
virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
virtual TPortionMeta::EProduced GetResultProducedClass() const override {
@@ -19,7 +19,7 @@ protected:
public:
using TBase::TBase;
- void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position);
+ void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include = true);
virtual TString TypeString() const override {
return StaticTypeName();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index 522d2a2eb2..d17e3de001 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -16,7 +16,7 @@ void TFetchingInterval::ConstructResult() {
if (i->GetStart().Compare(Start) == std::partial_ordering::equivalent && !i->IsMergingStarted()) {
auto rb = i->GetBatch();
if (rb) {
- Merger->AddPoolSource({}, rb, i->GetFilterStageData().GetNotAppliedEarlyFilter());
+ Merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter());
}
i->StartMerging();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
index d2c1676aa8..9141a15c0d 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -5,43 +5,28 @@ namespace NKikimr::NOlap::NIndexedReader {
void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
Y_ABORT_UNLESS(point);
- Y_ABORT_UNLESS(point->IsSameSortingSchema(SortSchema));
+ AFL_VERIFY(point->IsSameSortingSchema(SortSchema))("point", point->DebugJson())("schema", SortSchema->ToString());
Y_ABORT_UNLESS(point->IsReverseSort() == Reverse);
Y_ABORT_UNLESS(++ControlPoints == 1);
- SortHeap.emplace_back(TBatchIterator(*point));
- std::push_heap(SortHeap.begin(), SortHeap.end());
+ SortHeap.Push(TBatchIterator(*point));
}
-void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
+void TMergePartialStream::AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema));
- if (!poolId) {
- AddNewToHeap(poolId, batch, filter, true);
- } else {
- auto it = BatchPools.find(*poolId);
- if (it == BatchPools.end()) {
- it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first;
- }
- it->second.emplace_back(batch, filter);
- if (it->second.size() == 1) {
- AddNewToHeap(poolId, batch, filter, true);
- }
- }
+ AddNewToHeap(batch, filter);
}
-void TMergePartialStream::AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) {
+void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!filter || filter->IsTotalAllowFilter()) {
- SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId));
+ SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
- SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId));
- }
- if (restoreHeap) {
- std::push_heap(SortHeap.begin(), SortHeap.end());
+ SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse));
}
}
@@ -49,8 +34,8 @@ void TMergePartialStream::RemoveControlPoint() {
Y_ABORT_UNLESS(ControlPoints == 1);
Y_ABORT_UNLESS(ControlPointEnriched());
Y_ABORT_UNLESS(-- ControlPoints == 0);
- std::pop_heap(SortHeap.begin(), SortHeap.end());
- SortHeap.pop_back();
+ Y_ABORT_UNLESS(SortHeap.Current().IsControlPoint());
+ SortHeap.RemoveTop();
}
void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) {
@@ -73,11 +58,11 @@ bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSo
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
PutControlPoint(std::make_shared<TSortableBatchPosition>(readTo));
bool cpReachedFlag = false;
- while (SortHeap.size() && !cpReachedFlag) {
- if (SortHeap.front().IsControlPoint()) {
+ while (SortHeap.Size() && !cpReachedFlag) {
+ if (SortHeap.Current().IsControlPoint()) {
RemoveControlPoint();
cpReachedFlag = true;
- if (SortHeap.empty() || !includeFinish || SortHeap.front().GetKeyColumns().Compare(readTo) == std::partial_ordering::greater) {
+ if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(readTo) == std::partial_ordering::greater) {
return true;
}
}
@@ -92,7 +77,7 @@ bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSo
bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
- while (SortHeap.size()) {
+ while (SortHeap.Size()) {
if (auto currentPosition = DrainCurrentPosition()) {
CheckSequenceInDebug(*currentPosition);
builder.AddRecord(*currentPosition);
@@ -102,19 +87,19 @@ bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
}
std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() {
- Y_ABORT_UNLESS(SortHeap.size());
- Y_ABORT_UNLESS(!SortHeap.front().IsControlPoint());
- TSortableBatchPosition result = SortHeap.front().GetKeyColumns();
- TSortableBatchPosition resultVersion = SortHeap.front().GetVersionColumns();
+ Y_ABORT_UNLESS(SortHeap.Size());
+ Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
+ TSortableBatchPosition result = SortHeap.Current().GetKeyColumns();
+ TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns();
bool isFirst = true;
- const bool deletedFlag = SortHeap.front().IsDeleted();
- while (SortHeap.size() && (isFirst || result.Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) {
- auto& anotherIterator = SortHeap.front();
+ const bool deletedFlag = SortHeap.Current().IsDeleted();
+ while (SortHeap.Size() && (isFirst || result.Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::equivalent)) {
+ auto& anotherIterator = SortHeap.Current();
if (!isFirst) {
- AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
+ AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) != std::partial_ordering::less)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
("key", result.DebugJson());
}
- NextInHeap(true);
+ SortHeap.Next();
isFirst = false;
}
if (deletedFlag) {
@@ -123,13 +108,13 @@ std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition(
return result;
}
-std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::vector<TSortableBatchPosition>& positions,
- const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions)
+std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
+ const std::vector<std::shared_ptr<arrow::Field>>& resultFields)
{
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
for (auto&& i : positions) {
NIndexedReader::TRecordBatchBuilder indexesBuilder(resultFields);
- DrainCurrentTo(indexesBuilder, i, includePositions);
+ DrainCurrentTo(indexesBuilder, i.first, i.second);
result.emplace_back(indexesBuilder.Finalize());
if (result.back()->num_rows() == 0) {
result.pop_back();
@@ -147,11 +132,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
NJson::TJsonValue result;
result["is_cp"] = IsControlPoint();
- if (PoolId) {
- result["pool_id"] = *PoolId;
- } else {
- result["pool_id"] = "absent";
- }
result["key"] = KeyColumns.DebugJson();
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index c5d52219f2..c7884bd7ab 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -14,6 +14,120 @@ namespace NKikimr::NOlap::NIndexedReader {
class TRecordBatchBuilder;
+template <class TSortCursor>
+class TSortingHeap {
+public:
+ TSortingHeap() = default;
+
+ template <typename TCursors>
+ TSortingHeap(TCursors& cursors, bool notNull) {
+ Queue.reserve(cursors.size());
+ for (auto& cur : cursors) {
+ if (!cur.Empty()) {
+ Queue.emplace_back(TSortCursor(&cur, notNull));
+ }
+ }
+ std::make_heap(Queue.begin(), Queue.end());
+ }
+
+ const TSortCursor& Current() const { return Queue.front(); }
+ size_t Size() const { return Queue.size(); }
+ bool Empty() const { return Queue.empty(); }
+ TSortCursor& NextChild() { return Queue[NextChildIndex()]; }
+
+ void Next() {
+ Y_ABORT_UNLESS(Size());
+
+ if (Queue.front().Next()) {
+ UpdateTop();
+ } else {
+ RemoveTop();
+ }
+ }
+
+ void RemoveTop() {
+ std::pop_heap(Queue.begin(), Queue.end());
+ Queue.pop_back();
+ NextIdx = 0;
+ }
+
+ void Push(TSortCursor&& cursor) {
+ Queue.emplace_back(cursor);
+ std::push_heap(Queue.begin(), Queue.end());
+ NextIdx = 0;
+ }
+
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_ARRAY;
+ for (auto&& i : Queue) {
+ result.AppendValue(i.DebugJson());
+ }
+ return result;
+ }
+
+private:
+ std::vector<TSortCursor> Queue;
+ /// Cache comparison between first and second child if the order in queue has not been changed.
+ size_t NextIdx = 0;
+
+ size_t NextChildIndex() {
+ if (NextIdx == 0) {
+ NextIdx = 1;
+ if (Queue.size() > 2 && Queue[1] < Queue[2]) {
+ ++NextIdx;
+ }
+ }
+
+ return NextIdx;
+ }
+
+ /// This is adapted version of the function __sift_down from libc++.
+ /// Why cannot simply use std::priority_queue?
+ /// - because it doesn't support updating the top element and requires pop and push instead.
+ /// Also look at "Boost.Heap" library.
+ void UpdateTop() {
+ size_t size = Queue.size();
+ if (size < 2)
+ return;
+
+ auto begin = Queue.begin();
+
+ size_t child_idx = NextChildIndex();
+ auto child_it = begin + child_idx;
+
+ /// Check if we are in order.
+ if (*child_it < *begin)
+ return;
+
+ NextIdx = 0;
+
+ auto curr_it = begin;
+ auto top(std::move(*begin));
+ do {
+ /// We are not in heap-order, swap the parent with it's largest child.
+ *curr_it = std::move(*child_it);
+ curr_it = child_it;
+
+ // recompute the child based off of the updated parent
+ child_idx = 2 * child_idx + 1;
+
+ if (child_idx >= size)
+ break;
+
+ child_it = begin + child_idx;
+
+ if ((child_idx + 1) < size && *child_it < *(child_it + 1)) {
+ /// Right child exists and is greater than left child.
+ ++child_it;
+ ++child_idx;
+ }
+
+ /// Check if we are in order.
+ } while (!(*child_it < top));
+ *curr_it = std::move(top);
+ }
+};
+
class TMergePartialStream {
private:
#ifndef NDEBUG
@@ -26,7 +140,6 @@ private:
TSortableBatchPosition VersionColumns;
i64 RecordsCount;
int ReverseSortKff;
- YDB_OPT(ui32, PoolId);
std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;
@@ -62,13 +175,12 @@ private:
}
TBatchIterator(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
- const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::optional<ui32> poolId)
+ const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort)
: ControlPointFlag(false)
, KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
, VersionColumns(batch, 0, TIndexInfo::GetSpecialColumnNames(), {}, false)
, RecordsCount(batch->num_rows())
, ReverseSortKff(reverseSort ? -1 : 1)
- , PoolId(poolId)
, Filter(filter)
{
Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition()));
@@ -136,43 +248,11 @@ private:
result["current"] = CurrentKeyColumns->DebugJson();
}
#endif
- for (auto&& i : SortHeap) {
- result["heap"].AppendValue(i.DebugJson());
- }
+ result.InsertValue("heap", SortHeap.DebugJson());
return result;
}
- bool NextInHeap(const bool needPop) {
- if (SortHeap.empty()) {
- return false;
- }
- if (needPop) {
- std::pop_heap(SortHeap.begin(), SortHeap.end());
- }
- if (SortHeap.back().Next()) {
- std::push_heap(SortHeap.begin(), SortHeap.end());
- } else if (!SortHeap.back().HasPoolId()) {
- SortHeap.pop_back();
- } else {
- auto it = BatchPools.find(SortHeap.back().GetPoolIdUnsafe());
- Y_ABORT_UNLESS(it->second.size());
- if (it->second.size() == 1) {
- BatchPools.erase(it);
- SortHeap.pop_back();
- } else {
- it->second.pop_front();
- TBatchIterator oldIterator = std::move(SortHeap.back());
- SortHeap.pop_back();
- AddNewToHeap(SortHeap.back().GetPoolIdUnsafe(), it->second.front().GetBatch(), it->second.front().GetFilter(), false);
- oldIterator.CheckNextBatch(SortHeap.back());
- std::push_heap(SortHeap.begin(), SortHeap.end());
- }
- }
- return SortHeap.size();
- }
-
- THashMap<ui32, std::deque<TIteratorData>> BatchPools;
- std::vector<TBatchIterator> SortHeap;
+ TSortingHeap<TBatchIterator> SortHeap;
std::shared_ptr<arrow::Schema> SortSchema;
std::shared_ptr<arrow::Schema> DataSchema;
const bool Reverse;
@@ -180,7 +260,7 @@ private:
std::optional<TSortableBatchPosition> DrainCurrentPosition();
- void AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap);
+ void AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition);
public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse)
@@ -193,15 +273,7 @@ public:
}
bool IsValid() const {
- return SortHeap.size();
- }
-
- bool HasRecordsInPool(const ui32 poolId) const {
- auto it = BatchPools.find(poolId);
- if (it == BatchPools.end()) {
- return false;
- }
- return it->second.size();
+ return SortHeap.Size();
}
void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point);
@@ -209,19 +281,19 @@ public:
void RemoveControlPoint();
bool ControlPointEnriched() const {
- return SortHeap.size() && SortHeap.front().IsControlPoint();
+ return SortHeap.Size() && SortHeap.Current().IsControlPoint();
}
- void AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
+ void AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
bool IsEmpty() const {
- return SortHeap.empty();
+ return !SortHeap.Size();
}
bool DrainAll(TRecordBatchBuilder& builder);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish);
- std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::vector<TSortableBatchPosition>& positions,
- const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions);
+ std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
+ const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
};
class TRecordBatchBuilder {