diff options
author | chertus <azuikov@ydb.tech> | 2023-03-22 12:50:53 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-22 12:50:53 +0300 |
commit | c662852c21526004a43573404fc01a6ac70a7b64 (patch) | |
tree | 44f648f25a222163f44e56d86aa1d71cc7330cbc | |
parent | 42aab38139d46bca78ef6194988cee914c4e0d05 (diff) | |
download | ydb-c662852c21526004a43573404fc01a6ac70a7b64.tar.gz |
faster TMergingSortedInputStream::Merge()
-rw-r--r-- | ydb/core/formats/merging_sorted_input_stream.cpp | 65 | ||||
-rw-r--r-- | ydb/core/formats/merging_sorted_input_stream.h | 4 | ||||
-rw-r--r-- | ydb/core/formats/replace_key.h | 12 |
3 files changed, 62 insertions, 19 deletions
diff --git a/ydb/core/formats/merging_sorted_input_stream.cpp b/ydb/core/formats/merging_sorted_input_stream.cpp index 8a5ee53b52b..57728116a6b 100644 --- a/ydb/core/formats/merging_sorted_input_stream.cpp +++ b/ydb/core/formats/merging_sorted_input_stream.cpp @@ -47,6 +47,10 @@ public: return MaxRows && (AddedRows >= MaxRows); } + bool HasLimit() const override { + return MaxRows; + } + private: TBuilders& Columns; std::vector<std::pair<const TArrayVec*, size_t>> Rows; @@ -83,6 +87,10 @@ public: return MaxRows && (AddedRows >= MaxRows); } + bool HasLimit() const override { + return MaxRows; + } + std::shared_ptr<arrow::RecordBatch> GetBatch() { if (Batch) { return Batch->Slice(Offset, AddedRows); @@ -204,28 +212,40 @@ void TMergingSortedInputStream::FetchNextBatch(const TSortCursor& current, TSort /// Take rows in required order and put them into `rowBuffer`, /// while the number of rows are no more than `max_block_size` -void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue) { +template <bool replace, bool limit> +void TMergingSortedInputStream::MergeImpl(IRowsBuffer& rowsBuffer, TSortingHeap& queue) { + if constexpr (replace) { + if (!PrevKey && queue.IsValid()) { + auto current = queue.Current(); + PrevKey = std::make_shared<TReplaceKey>(current->replace_columns, current->getRow()); + if (!rowsBuffer.AddRow(current)) { + return; + } + // Do not get Next() for simplicity. Lead to a dup + } + } + while (queue.IsValid()) { - if (rowsBuffer.Limit()) { - rowsBuffer.Flush(); - return; + if constexpr (limit) { + if (rowsBuffer.Limit()) { + return; + } } auto current = queue.Current(); - if (Description->Replace()) { - auto key = std::make_shared<TReplaceKey>(current->replace_columns, current->getRow()); - bool isDup = (PrevKey && *key == *PrevKey); + if constexpr (replace) { + TReplaceKey key(current->replace_columns, current->getRow()); - if (isDup || rowsBuffer.AddRow(current)) { - PrevKey = key; + if (key == *PrevKey) { + // do nothing + } else if (rowsBuffer.AddRow(current)) { + *PrevKey = key; } else { - rowsBuffer.Flush(); return; } } else { if (!rowsBuffer.AddRow(current)) { - rowsBuffer.Flush(); return; } } @@ -238,11 +258,30 @@ void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& que } } - rowsBuffer.Flush(); - /// We have read all data. Ask children to cancel providing more data. Cancel(); Finished = true; } +void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue) { + const bool replace = Description->Replace(); + const bool limit = rowsBuffer.HasLimit(); + + if (replace) { + if (limit) { + MergeImpl<true, true>(rowsBuffer, queue); + } else { + MergeImpl<true, false>(rowsBuffer, queue); + } + } else { + if (limit) { + MergeImpl<false, true>(rowsBuffer, queue); + } else { + MergeImpl<false, false>(rowsBuffer, queue); + } + } + + rowsBuffer.Flush(); +} + } diff --git a/ydb/core/formats/merging_sorted_input_stream.h b/ydb/core/formats/merging_sorted_input_stream.h index e66493d57a2..b5250a7f337 100644 --- a/ydb/core/formats/merging_sorted_input_stream.h +++ b/ydb/core/formats/merging_sorted_input_stream.h @@ -12,6 +12,7 @@ struct IRowsBuffer { virtual bool AddRow(const TSortCursor& cursor) = 0; virtual void Flush() = 0; virtual bool Limit() const = 0; + virtual bool HasLimit() const = 0; }; /// Merges several sorted streams into one sorted stream. @@ -44,6 +45,9 @@ private: void Init(); void FetchNextBatch(const TSortCursor& current, TSortingHeap& queue); void Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue); + + template <bool replace, bool limit> + void MergeImpl(IRowsBuffer& rowsBuffer, TSortingHeap& queue); }; } diff --git a/ydb/core/formats/replace_key.h b/ydb/core/formats/replace_key.h index c902ea46a67..19da041b42b 100644 --- a/ydb/core/formats/replace_key.h +++ b/ydb/core/formats/replace_key.h @@ -11,7 +11,7 @@ public: : Columns(columns) , Position(position) { - Y_VERIFY(Size() > 0 && Position < Column(0).length()); + Y_VERIFY_DEBUG(Size() > 0 && Position < Column(0).length()); } size_t Hash() const { @@ -21,10 +21,10 @@ public: // TODO: NULLs template<typename T> bool operator == (const TReplaceKeyTemplate<T>& key) const { - Y_VERIFY(Size() == key.Size()); + Y_VERIFY_DEBUG(Size() == key.Size()); for (int i = 0; i < Size(); ++i) { - Y_VERIFY(Column(i).type_id() == key.Column(i).type_id()); + Y_VERIFY_DEBUG(Column(i).type_id() == key.Column(i).type_id()); if (!TypedEquals(Column(i), Position, key.Column(i), key.Position)) { return false; @@ -35,7 +35,7 @@ public: template<typename T> bool operator < (const TReplaceKeyTemplate<T>& key) const { - Y_VERIFY(Size() == key.Size()); + Y_VERIFY_DEBUG(Size() == key.Size()); for (int i = 0; i < Size(); ++i) { int cmp = CompareColumnValue(i, key, i); @@ -50,7 +50,7 @@ public: template<typename T> bool LessNotNull(const TReplaceKeyTemplate<T>& key) const { - Y_VERIFY(Size() == key.Size()); + Y_VERIFY_DEBUG(Size() == key.Size()); for (int i = 0; i < Size(); ++i) { int cmp = CompareColumnValue(i, key, i, true); @@ -65,7 +65,7 @@ public: template<typename T> int CompareColumnValue(int column, const TReplaceKeyTemplate<T>& key, int keyColumn, bool notNull = false) const { - Y_VERIFY(Column(column).type_id() == key.Column(keyColumn).type_id()); + Y_VERIFY_DEBUG(Column(column).type_id() == key.Column(keyColumn).type_id()); if (notNull) { return TypedCompare<true>(Column(column), Position, key.Column(keyColumn), key.Position); |