diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp')
| -rw-r--r-- | contrib/clickhouse/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp b/contrib/clickhouse/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp new file mode 100644 index 00000000000..e7a431dc1d0 --- /dev/null +++ b/contrib/clickhouse/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp @@ -0,0 +1,159 @@ +#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h> +#include <Columns/ColumnsNumber.h> +#include <IO/WriteBuffer.h> + +namespace DB +{ + +static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192; + +VersionedCollapsingAlgorithm::VersionedCollapsingAlgorithm( + const Block & header_, + size_t num_inputs, + SortDescription description_, + const String & sign_column_, + size_t max_block_size_rows_, + size_t max_block_size_bytes_, + WriteBuffer * out_row_sources_buf_, + bool use_average_block_sizes) + : IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE) + , merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_) + /// -1 for +1 in FixedSizeDequeWithGaps's internal buffer. 3 is a reasonable minimum size to collapse anything. + , max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_rows_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1) + , current_keys(max_rows_in_queue) +{ + sign_column_number = header_.getPositionByName(sign_column_); +} + +inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source) +{ + if constexpr (sizeof(RowSourcePart) == 1) + buffer.write(*reinterpret_cast<const char *>(&row_source)); + else + buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart)); +} + +void VersionedCollapsingAlgorithm::insertGap(size_t gap_size) +{ + if (out_row_sources_buf) + { + for (size_t i = 0; i < gap_size; ++i) + { + writeRowSourcePart(*out_row_sources_buf, current_row_sources.front()); + current_row_sources.pop(); + } + } +} + +void VersionedCollapsingAlgorithm::insertRow(size_t skip_rows, const RowRef & row) +{ + merged_data.insertRow(*row.all_columns, row.row_num, row.owned_chunk->getNumRows()); + + insertGap(skip_rows); + + if (out_row_sources_buf) + { + current_row_sources.front().setSkipFlag(false); + writeRowSourcePart(*out_row_sources_buf, current_row_sources.front()); + current_row_sources.pop(); + } +} + +IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge() +{ + /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` + while (queue.isValid()) + { + SortCursor current = queue.current(); + + if (current->isLast() && skipLastRowFor(current->order)) + { + /// Get the next block from the corresponding source, if there is one. + queue.removeTop(); + return Status(current.impl->order); + } + + RowRef current_row; + + Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getRow()]; + + setRowRef(current_row, current); + + /// At first, let's decide the number of rows needed to insert right now. + size_t num_rows_to_insert = 0; + if (!current_keys.empty()) + { + auto key_differs = !current_row.hasEqualSortColumnsWith(current_keys.back()); + + if (key_differs) /// Flush whole queue + num_rows_to_insert = current_keys.size(); + else if (current_keys.size() >= max_rows_in_queue) /// Flush single row if queue is big + num_rows_to_insert = 1; + } + + /// Insert ready rows if any. + while (num_rows_to_insert) + { + const auto & row = current_keys.front(); + auto gap = current_keys.frontGap(); + + insertRow(gap, row); + + current_keys.popFront(); + + --num_rows_to_insert; + + /// It's ok to return here, because we didn't affect queue. + if (merged_data.hasEnoughRows()) + return Status(merged_data.pull()); + } + + if (current_keys.empty()) + { + sign_in_queue = sign; + current_keys.pushBack(current_row); + } + else /// If queue is not empty, then current_row has the same key as in current_keys queue + { + if (sign == sign_in_queue) + current_keys.pushBack(current_row); + else + { + current_keys.popBack(); + current_keys.pushGap(2); + } + } + + if (out_row_sources_buf) + current_row_sources.emplace(current->order, true); + + if (!current->isLast()) + { + queue.next(); + } + else + { + /// We take next block from the corresponding source, if there is one. + queue.removeTop(); + return Status(current.impl->order); + } + } + + while (!current_keys.empty()) + { + const auto & row = current_keys.front(); + auto gap = current_keys.frontGap(); + + insertRow(gap, row); + current_keys.popFront(); + + if (merged_data.hasEnoughRows()) + return Status(merged_data.pull()); + } + + /// Write information about last collapsed rows. + insertGap(current_keys.frontGap()); + return Status(merged_data.pull(), true); +} + +} |
