aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-22 12:50:53 +0300
committerchertus <azuikov@ydb.tech>2023-03-22 12:50:53 +0300
commitc662852c21526004a43573404fc01a6ac70a7b64 (patch)
tree44f648f25a222163f44e56d86aa1d71cc7330cbc
parent42aab38139d46bca78ef6194988cee914c4e0d05 (diff)
downloadydb-c662852c21526004a43573404fc01a6ac70a7b64.tar.gz
faster TMergingSortedInputStream::Merge()
-rw-r--r--ydb/core/formats/merging_sorted_input_stream.cpp65
-rw-r--r--ydb/core/formats/merging_sorted_input_stream.h4
-rw-r--r--ydb/core/formats/replace_key.h12
3 files changed, 62 insertions, 19 deletions
diff --git a/ydb/core/formats/merging_sorted_input_stream.cpp b/ydb/core/formats/merging_sorted_input_stream.cpp
index 8a5ee53b52b..57728116a6b 100644
--- a/ydb/core/formats/merging_sorted_input_stream.cpp
+++ b/ydb/core/formats/merging_sorted_input_stream.cpp
@@ -47,6 +47,10 @@ public:
return MaxRows && (AddedRows >= MaxRows);
}
+ bool HasLimit() const override {
+ return MaxRows;
+ }
+
private:
TBuilders& Columns;
std::vector<std::pair<const TArrayVec*, size_t>> Rows;
@@ -83,6 +87,10 @@ public:
return MaxRows && (AddedRows >= MaxRows);
}
+ bool HasLimit() const override {
+ return MaxRows;
+ }
+
std::shared_ptr<arrow::RecordBatch> GetBatch() {
if (Batch) {
return Batch->Slice(Offset, AddedRows);
@@ -204,28 +212,40 @@ void TMergingSortedInputStream::FetchNextBatch(const TSortCursor& current, TSort
/// Take rows in required order and put them into `rowBuffer`,
/// while the number of rows are no more than `max_block_size`
-void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue) {
+template <bool replace, bool limit>
+void TMergingSortedInputStream::MergeImpl(IRowsBuffer& rowsBuffer, TSortingHeap& queue) {
+ if constexpr (replace) {
+ if (!PrevKey && queue.IsValid()) {
+ auto current = queue.Current();
+ PrevKey = std::make_shared<TReplaceKey>(current->replace_columns, current->getRow());
+ if (!rowsBuffer.AddRow(current)) {
+ return;
+ }
+ // Do not get Next() for simplicity. Lead to a dup
+ }
+ }
+
while (queue.IsValid()) {
- if (rowsBuffer.Limit()) {
- rowsBuffer.Flush();
- return;
+ if constexpr (limit) {
+ if (rowsBuffer.Limit()) {
+ return;
+ }
}
auto current = queue.Current();
- if (Description->Replace()) {
- auto key = std::make_shared<TReplaceKey>(current->replace_columns, current->getRow());
- bool isDup = (PrevKey && *key == *PrevKey);
+ if constexpr (replace) {
+ TReplaceKey key(current->replace_columns, current->getRow());
- if (isDup || rowsBuffer.AddRow(current)) {
- PrevKey = key;
+ if (key == *PrevKey) {
+ // do nothing
+ } else if (rowsBuffer.AddRow(current)) {
+ *PrevKey = key;
} else {
- rowsBuffer.Flush();
return;
}
} else {
if (!rowsBuffer.AddRow(current)) {
- rowsBuffer.Flush();
return;
}
}
@@ -238,11 +258,30 @@ void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& que
}
}
- rowsBuffer.Flush();
-
/// We have read all data. Ask children to cancel providing more data.
Cancel();
Finished = true;
}
+void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue) {
+ const bool replace = Description->Replace();
+ const bool limit = rowsBuffer.HasLimit();
+
+ if (replace) {
+ if (limit) {
+ MergeImpl<true, true>(rowsBuffer, queue);
+ } else {
+ MergeImpl<true, false>(rowsBuffer, queue);
+ }
+ } else {
+ if (limit) {
+ MergeImpl<false, true>(rowsBuffer, queue);
+ } else {
+ MergeImpl<false, false>(rowsBuffer, queue);
+ }
+ }
+
+ rowsBuffer.Flush();
+}
+
}
diff --git a/ydb/core/formats/merging_sorted_input_stream.h b/ydb/core/formats/merging_sorted_input_stream.h
index e66493d57a2..b5250a7f337 100644
--- a/ydb/core/formats/merging_sorted_input_stream.h
+++ b/ydb/core/formats/merging_sorted_input_stream.h
@@ -12,6 +12,7 @@ struct IRowsBuffer {
virtual bool AddRow(const TSortCursor& cursor) = 0;
virtual void Flush() = 0;
virtual bool Limit() const = 0;
+ virtual bool HasLimit() const = 0;
};
/// Merges several sorted streams into one sorted stream.
@@ -44,6 +45,9 @@ private:
void Init();
void FetchNextBatch(const TSortCursor& current, TSortingHeap& queue);
void Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue);
+
+ template <bool replace, bool limit>
+ void MergeImpl(IRowsBuffer& rowsBuffer, TSortingHeap& queue);
};
}
diff --git a/ydb/core/formats/replace_key.h b/ydb/core/formats/replace_key.h
index c902ea46a67..19da041b42b 100644
--- a/ydb/core/formats/replace_key.h
+++ b/ydb/core/formats/replace_key.h
@@ -11,7 +11,7 @@ public:
: Columns(columns)
, Position(position)
{
- Y_VERIFY(Size() > 0 && Position < Column(0).length());
+ Y_VERIFY_DEBUG(Size() > 0 && Position < Column(0).length());
}
size_t Hash() const {
@@ -21,10 +21,10 @@ public:
// TODO: NULLs
template<typename T>
bool operator == (const TReplaceKeyTemplate<T>& key) const {
- Y_VERIFY(Size() == key.Size());
+ Y_VERIFY_DEBUG(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
- Y_VERIFY(Column(i).type_id() == key.Column(i).type_id());
+ Y_VERIFY_DEBUG(Column(i).type_id() == key.Column(i).type_id());
if (!TypedEquals(Column(i), Position, key.Column(i), key.Position)) {
return false;
@@ -35,7 +35,7 @@ public:
template<typename T>
bool operator < (const TReplaceKeyTemplate<T>& key) const {
- Y_VERIFY(Size() == key.Size());
+ Y_VERIFY_DEBUG(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
int cmp = CompareColumnValue(i, key, i);
@@ -50,7 +50,7 @@ public:
template<typename T>
bool LessNotNull(const TReplaceKeyTemplate<T>& key) const {
- Y_VERIFY(Size() == key.Size());
+ Y_VERIFY_DEBUG(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
int cmp = CompareColumnValue(i, key, i, true);
@@ -65,7 +65,7 @@ public:
template<typename T>
int CompareColumnValue(int column, const TReplaceKeyTemplate<T>& key, int keyColumn, bool notNull = false) const {
- Y_VERIFY(Column(column).type_id() == key.Column(keyColumn).type_id());
+ Y_VERIFY_DEBUG(Column(column).type_id() == key.Column(keyColumn).type_id());
if (notNull) {
return TypedCompare<true>(Column(column), Position, key.Column(keyColumn), key.Position);