diff options
author | stanly <stanly@yandex-team.com> | 2023-04-07 14:29:52 +0300 |
---|---|---|
committer | stanly <stanly@yandex-team.com> | 2023-04-07 14:29:52 +0300 |
commit | ae51311951780b298724a071fa84c0f8bd30a9f8 (patch) | |
tree | 21634a21883e955aa4a47cd758d50811ef2cc8a4 | |
parent | bebed7ab9b918d9bb64cc4d8ccf739be80fa1e4b (diff) | |
download | ydb-ae51311951780b298724a071fa84c0f8bd30a9f8.tar.gz |
get rid of TMap in TMarksGranules
* replace TMap with std::vector
* remove template parameter from SliceIntoGranulesImpl
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 169 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 9 |
3 files changed, 95 insertions, 91 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 7db94e55bd4..73b5e83a3f4 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -1,9 +1,13 @@ #include "column_engine_logs.h" #include "indexed_read_data.h" #include "filter.h" + #include <ydb/core/formats/one_batch_input_stream.h> #include <ydb/core/formats/merging_sorted_input_stream.h> +#include <concepts> +#include <span> + namespace NKikimr::NOlap { namespace { @@ -328,79 +332,60 @@ TVector<const TPortionInfo*> GetActualPortions(const THashMap<ui64, TPortionInfo return out; } -template <typename T> -inline THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranulesImpl( - const std::shared_ptr<arrow::RecordBatch>& batch, const T& tsGranules, const TIndexInfo& indexInfo) +THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> +SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, + const std::vector<std::pair<TMark, ui64>>& granules, + const TIndexInfo& indexInfo) { THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out; - if (tsGranules.size() == 1) { - //Y_VERIFY(tsGranules.begin()->first.IsDefault()); - ui64 granule = tsGranules.begin()->second; - out.emplace(granule, batch); + if (granules.size() == 1) { + out.emplace(granules[0].second, batch); } else { - auto keyColumn = GetFirstPKColumn(indexInfo, batch); + const auto keyColumn = GetFirstPKColumn(indexInfo, batch); Y_VERIFY(keyColumn && keyColumn->length() > 0); - TVector<TMark> borders; - borders.reserve(tsGranules.size()); - for (auto& [ts, granule] : tsGranules) { - borders.push_back(ts); - } - - ui32 i = 0; i64 offset = 0; - for (auto& [ts, granule] : tsGranules) { - i64 end = keyColumn->length(); - if (i < borders.size() - 1) { - TMark border = borders[i + 1]; - end = NArrow::LowerBound(keyColumn, *border.Border, offset); - } - - i64 size = end - offset; - if (size) { - Y_VERIFY(size > 0); - Y_VERIFY(!out.count(granule)); - out.emplace(granule, batch->Slice(offset, size)); + for (size_t i = 0; i < granules.size(); ++i) { + const i64 end = (i + 1 == granules.size()) + // Just take the number of elements in the key column for the last granule. + ? keyColumn->length() + // Locate position of the next granule in the key. + : NArrow::LowerBound(keyColumn, *granules[i + 1].first.Border, offset); + + if (const i64 size = end - offset) { + Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second); } offset = end; - ++i; } } return out; } -THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> -SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, - const std::vector<std::pair<TMark, ui64>>& markGranules, - const TIndexInfo& indexInfo) -{ - return SliceIntoGranulesImpl(batch, markGranules, indexInfo); -} - } // namespace -THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> -TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, - const TIndexInfo& indexInfo) + +TColumnEngineForLogs::TMarksGranules::TMarksGranules(std::vector<TPair>&& marks) noexcept + : Marks(std::move(marks)) { - return SliceIntoGranulesImpl(batch, Marks, indexInfo); + Y_VERIFY_DEBUG(std::is_sorted(Marks.begin(), Marks.end())); } TColumnEngineForLogs::TMarksGranules::TMarksGranules(std::vector<TMark>&& points) { std::sort(points.begin(), points.end()); Marks.reserve(points.size()); - ui32 counter = 0; - for (auto&& mark : points) { - Marks.emplace_back(std::move(mark), ++counter); + + for (size_t i = 0, end = points.size(); i != end; ++i) { + Marks.emplace_back(std::move(points[i]), i + 1); } } TColumnEngineForLogs::TMarksGranules::TMarksGranules(const TSelectInfo& selectInfo) { Marks.reserve(selectInfo.Granules.size()); - for (auto& rec : selectInfo.Granules) { + + for (const auto& rec : selectInfo.Granules) { Marks.emplace_back(std::make_pair(rec.Mark, rec.Granule)); } @@ -430,6 +415,14 @@ bool TColumnEngineForLogs::TMarksGranules::MakePrecedingMark(const TIndexInfo& i return false; } +THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> +TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, + const TIndexInfo& indexInfo) +{ + return NOlap::SliceIntoGranules(batch, Marks, indexInfo); +} + + TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits) : IndexInfo(std::move(info)) , Limits(limits) @@ -1875,31 +1868,38 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, return out; } -static ui64 TryMovePortions(TVector<TPortionInfo>& portions, - TMap<TMark, ui64>& tsIds, - TVector<std::pair<TPortionInfo, ui64>>& toMove, - const TMark& ts0) { - std::vector<const TPortionInfo*> compacted; - compacted.reserve(portions.size()); - std::vector<const TPortionInfo*> inserted; - inserted.reserve(portions.size()); +/// @param[in,out] portions unchanged or only inserted portions in the same orders +/// @param[in,out] tsIds unchanged or marks from compacted portions ordered by mark +/// @param[in,out] toMove unchanged or compacted portions ordered by primary key +static ui64 TryMovePortions(const TMark& ts0, + TVector<TPortionInfo>& portions, + std::vector<std::pair<TMark, ui64>>& tsIds, + TVector<std::pair<TPortionInfo, ui64>>& toMove) +{ + std::vector<TPortionInfo*> partitioned(portions.size()); + // Split portions by putting the inserted portions in the original order + // at the beginning of the buffer and the compacted portions at the end. + // The compacted portions will be put in the reversed order, but it will be sorted later. + const auto [inserted, compacted] = [&]() { + size_t l = 0; + size_t r = portions.size(); - for (auto& portionInfo : portions) { - if (portionInfo.IsInserted()) { - inserted.push_back(&portionInfo); - } else { - compacted.push_back(&portionInfo); + for (auto& portionInfo : portions) { + partitioned[(portionInfo.IsInserted() ? l++ : --r)] = &portionInfo; } - } + return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end())); + }(); + + // Do nothing if there are less than two compacted protions. if (compacted.size() < 2) { return 0; } - + // Order compacted portions by primary key. std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) { return NArrow::ScalarLess(*a->PkStart(), *b->PkStart()); }); - + // Check that there are no gaps between two adjacent portions in term of primary key range. for (size_t i = 0; i < compacted.size() - 1; ++i) { if (!NArrow::ScalarLess(*compacted[i]->PkEnd(), *compacted[i + 1]->PkStart())) { return 0; @@ -1910,23 +1910,22 @@ static ui64 TryMovePortions(TVector<TPortionInfo>& portions, ui64 numRows = 0; ui32 counter = 0; for (auto* portionInfo : compacted) { - TMark ts = ts0; - if (counter) { - ts = TMark(portionInfo->PkStart()); - } - ui32 rows = portionInfo->NumRows(); Y_VERIFY(rows); numRows += rows; - tsIds.emplace(ts, counter + 1); + tsIds.emplace_back((counter ? TMark(portionInfo->PkStart()) : ts0), counter + 1); toMove.emplace_back(std::move(*portionInfo), counter); ++counter; + // Ensure that std::move will take an effect. + static_assert(std::swappable<decltype(*portionInfo)>); } std::vector<TPortionInfo> out; out.reserve(inserted.size()); for (auto* portionInfo : inserted) { out.emplace_back(std::move(*portionInfo)); + // Ensure that std::move will take an effect. + static_assert(std::swappable<decltype(*portionInfo)>); } portions.swap(out); @@ -1934,15 +1933,15 @@ static ui64 TryMovePortions(TVector<TPortionInfo>& portions, } static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, - std::shared_ptr<TColumnEngineForLogs::TChanges> changes) { - ui64 pathId = changes->SrcGranule->PathId; - //ui64 granule = changes->SrcGranule->Granule; - TMark ts0 = changes->SrcGranule->Mark; + const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) +{ + const ui64 pathId = changes->SrcGranule->PathId; + const TMark ts0 = changes->SrcGranule->Mark; TVector<TPortionInfo>& portions = changes->SwitchedPortions; - TMap<TMark, ui64> tsIds; - ui64 movedRows = TryMovePortions(portions, tsIds, changes->PortionsToMove, ts0); - auto srcBatches = PortionsToBatches(indexInfo, portions, changes->Blobs, (bool)movedRows); + std::vector<std::pair<TMark, ui64>> tsIds; + ui64 movedRows = TryMovePortions(ts0, portions, tsIds, changes->PortionsToMove); + auto srcBatches = PortionsToBatches(indexInfo, portions, changes->Blobs, movedRows != 0); Y_VERIFY(srcBatches.size() == portions.size()); TVector<TString> blobs; @@ -1952,6 +1951,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, Y_VERIFY(changes->PortionsToMove.size() == tsIds.size()); Y_VERIFY(tsIds.begin()->first == ts0); + // Calculate total number of rows. ui64 numRows = movedRows; for (auto& batch : srcBatches) { numRows += batch->num_rows(); @@ -1960,31 +1960,34 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, // Recalculate new granules borders (if they are larger then portions) ui32 numSplitInto = changes->NumSplitInto(numRows); if (numSplitInto < tsIds.size()) { - ui32 rowsInGranule = numRows / numSplitInto; + const ui32 rowsInGranule = numRows / numSplitInto; Y_VERIFY(rowsInGranule); - TMap<TMark, ui64> newTsIds; - auto lastTs = tsIds.rbegin()->first; + TVector<std::pair<TMark, ui64>> newTsIds; ui32 tmpGranule = 0; ui32 sumRows = 0; - ui32 i = 0; - for (auto& [ts, _] : tsIds) { - if (sumRows >= rowsInGranule || (ts == lastTs && newTsIds.empty())) { + // Always insert mark of the source granule at the beginning. + newTsIds.emplace_back(ts0, 1); + + for (size_t i = 0, end = tsIds.size(); i != end; ++i) { + const TMark& ts = tsIds[i].first; + // Make new granule if the current number of rows is exceeded the allowed number of rows in the granule + // or there is the end of the ids and nothing was inserted so far. + if (sumRows >= rowsInGranule || (i + 1 == end && newTsIds.size() == 1)) { ++tmpGranule; - newTsIds.emplace(ts, tmpGranule + 1); + newTsIds.emplace_back(ts, tmpGranule + 1); sumRows = 0; } auto& toMove = changes->PortionsToMove[i]; sumRows += toMove.first.NumRows(); toMove.second = tmpGranule; - ++i; } - newTsIds[ts0] = 1; tsIds.swap(newTsIds); } Y_VERIFY(tsIds.size() > 1); + Y_VERIFY(tsIds[0] == std::make_pair(ts0, ui64(1))); TColumnEngineForLogs::TMarksGranules marksGranules(std::move(tsIds)); // Slice inserted portions with granules' borders @@ -2028,7 +2031,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) { ui64 tmpGranule = changes->SetTmpGranule(pathId, mark); - for (auto& batch : idBatches[id]) { + for (const auto& batch : idBatches[id]) { // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). auto newPortions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs); Y_VERIFY(newPortions.size() > 0); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 18fe886043e..71eca591be5 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -97,16 +97,10 @@ public: using TPair = std::pair<TMark, ui64>; TMarksGranules() = default; + TMarksGranules(std::vector<TPair>&& marks) noexcept; TMarksGranules(std::vector<TMark>&& points); TMarksGranules(const TSelectInfo& selectInfo); - TMarksGranules(TMap<TMark, ui64>&& marks) { - Marks.reserve(marks.size()); - for (auto&& [m, granule] : marks) { - Marks.emplace_back(std::move(m), granule); - } - } - const std::vector<TPair>& GetOrderedMarks() const noexcept { return Marks; } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index fc9912cc738..8069a7bdb90 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -1,4 +1,5 @@ #pragma once + #include "defs.h" #include "columns_table.h" #include "index_info.h" @@ -268,4 +269,10 @@ struct TPortionInfo { } }; -} +/// Ensure that TPortionInfo can be effectively assigned by moving the value. +static_assert(std::is_nothrow_move_assignable<TPortionInfo>::value); + +/// Ensure that TPortionInfo can be effectively constructed by moving the value. +static_assert(std::is_nothrow_move_constructible<TPortionInfo>::value); + +} // namespace NKikimr::NOlap |