aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <stanly@yandex-team.com>2023-04-07 14:29:52 +0300
committerstanly <stanly@yandex-team.com>2023-04-07 14:29:52 +0300
commitae51311951780b298724a071fa84c0f8bd30a9f8 (patch)
tree21634a21883e955aa4a47cd758d50811ef2cc8a4
parentbebed7ab9b918d9bb64cc4d8ccf739be80fa1e4b (diff)
downloadydb-ae51311951780b298724a071fa84c0f8bd30a9f8.tar.gz
get rid of TMap in TMarksGranules
* replace TMap with std::vector * remove template parameter from SliceIntoGranulesImpl
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp169
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h8
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h9
3 files changed, 95 insertions, 91 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 7db94e55bd4..73b5e83a3f4 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1,9 +1,13 @@
#include "column_engine_logs.h"
#include "indexed_read_data.h"
#include "filter.h"
+
#include <ydb/core/formats/one_batch_input_stream.h>
#include <ydb/core/formats/merging_sorted_input_stream.h>
+#include <concepts>
+#include <span>
+
namespace NKikimr::NOlap {
namespace {
@@ -328,79 +332,60 @@ TVector<const TPortionInfo*> GetActualPortions(const THashMap<ui64, TPortionInfo
return out;
}
-template <typename T>
-inline THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranulesImpl(
- const std::shared_ptr<arrow::RecordBatch>& batch, const T& tsGranules, const TIndexInfo& indexInfo)
+THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
+SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const std::vector<std::pair<TMark, ui64>>& granules,
+ const TIndexInfo& indexInfo)
{
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out;
- if (tsGranules.size() == 1) {
- //Y_VERIFY(tsGranules.begin()->first.IsDefault());
- ui64 granule = tsGranules.begin()->second;
- out.emplace(granule, batch);
+ if (granules.size() == 1) {
+ out.emplace(granules[0].second, batch);
} else {
- auto keyColumn = GetFirstPKColumn(indexInfo, batch);
+ const auto keyColumn = GetFirstPKColumn(indexInfo, batch);
Y_VERIFY(keyColumn && keyColumn->length() > 0);
- TVector<TMark> borders;
- borders.reserve(tsGranules.size());
- for (auto& [ts, granule] : tsGranules) {
- borders.push_back(ts);
- }
-
- ui32 i = 0;
i64 offset = 0;
- for (auto& [ts, granule] : tsGranules) {
- i64 end = keyColumn->length();
- if (i < borders.size() - 1) {
- TMark border = borders[i + 1];
- end = NArrow::LowerBound(keyColumn, *border.Border, offset);
- }
-
- i64 size = end - offset;
- if (size) {
- Y_VERIFY(size > 0);
- Y_VERIFY(!out.count(granule));
- out.emplace(granule, batch->Slice(offset, size));
+ for (size_t i = 0; i < granules.size(); ++i) {
+ const i64 end = (i + 1 == granules.size())
+ // Just take the number of elements in the key column for the last granule.
+ ? keyColumn->length()
+ // Locate position of the next granule in the key.
+ : NArrow::LowerBound(keyColumn, *granules[i + 1].first.Border, offset);
+
+ if (const i64 size = end - offset) {
+ Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second);
}
offset = end;
- ++i;
}
}
return out;
}
-THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::vector<std::pair<TMark, ui64>>& markGranules,
- const TIndexInfo& indexInfo)
-{
- return SliceIntoGranulesImpl(batch, markGranules, indexInfo);
-}
-
} // namespace
-THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
- const TIndexInfo& indexInfo)
+
+TColumnEngineForLogs::TMarksGranules::TMarksGranules(std::vector<TPair>&& marks) noexcept
+ : Marks(std::move(marks))
{
- return SliceIntoGranulesImpl(batch, Marks, indexInfo);
+ Y_VERIFY_DEBUG(std::is_sorted(Marks.begin(), Marks.end()));
}
TColumnEngineForLogs::TMarksGranules::TMarksGranules(std::vector<TMark>&& points) {
std::sort(points.begin(), points.end());
Marks.reserve(points.size());
- ui32 counter = 0;
- for (auto&& mark : points) {
- Marks.emplace_back(std::move(mark), ++counter);
+
+ for (size_t i = 0, end = points.size(); i != end; ++i) {
+ Marks.emplace_back(std::move(points[i]), i + 1);
}
}
TColumnEngineForLogs::TMarksGranules::TMarksGranules(const TSelectInfo& selectInfo) {
Marks.reserve(selectInfo.Granules.size());
- for (auto& rec : selectInfo.Granules) {
+
+ for (const auto& rec : selectInfo.Granules) {
Marks.emplace_back(std::make_pair(rec.Mark, rec.Granule));
}
@@ -430,6 +415,14 @@ bool TColumnEngineForLogs::TMarksGranules::MakePrecedingMark(const TIndexInfo& i
return false;
}
+THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
+TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TIndexInfo& indexInfo)
+{
+ return NOlap::SliceIntoGranules(batch, Marks, indexInfo);
+}
+
+
TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits)
: IndexInfo(std::move(info))
, Limits(limits)
@@ -1875,31 +1868,38 @@ SliceGranuleBatches(const TIndexInfo& indexInfo,
return out;
}
-static ui64 TryMovePortions(TVector<TPortionInfo>& portions,
- TMap<TMark, ui64>& tsIds,
- TVector<std::pair<TPortionInfo, ui64>>& toMove,
- const TMark& ts0) {
- std::vector<const TPortionInfo*> compacted;
- compacted.reserve(portions.size());
- std::vector<const TPortionInfo*> inserted;
- inserted.reserve(portions.size());
+/// @param[in,out] portions unchanged or only inserted portions in the same orders
+/// @param[in,out] tsIds unchanged or marks from compacted portions ordered by mark
+/// @param[in,out] toMove unchanged or compacted portions ordered by primary key
+static ui64 TryMovePortions(const TMark& ts0,
+ TVector<TPortionInfo>& portions,
+ std::vector<std::pair<TMark, ui64>>& tsIds,
+ TVector<std::pair<TPortionInfo, ui64>>& toMove)
+{
+ std::vector<TPortionInfo*> partitioned(portions.size());
+ // Split portions by putting the inserted portions in the original order
+ // at the beginning of the buffer and the compacted portions at the end.
+ // The compacted portions will be put in the reversed order, but it will be sorted later.
+ const auto [inserted, compacted] = [&]() {
+ size_t l = 0;
+ size_t r = portions.size();
- for (auto& portionInfo : portions) {
- if (portionInfo.IsInserted()) {
- inserted.push_back(&portionInfo);
- } else {
- compacted.push_back(&portionInfo);
+ for (auto& portionInfo : portions) {
+ partitioned[(portionInfo.IsInserted() ? l++ : --r)] = &portionInfo;
}
- }
+ return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end()));
+ }();
+
+ // Do nothing if there are less than two compacted protions.
if (compacted.size() < 2) {
return 0;
}
-
+ // Order compacted portions by primary key.
std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) {
return NArrow::ScalarLess(*a->PkStart(), *b->PkStart());
});
-
+ // Check that there are no gaps between two adjacent portions in term of primary key range.
for (size_t i = 0; i < compacted.size() - 1; ++i) {
if (!NArrow::ScalarLess(*compacted[i]->PkEnd(), *compacted[i + 1]->PkStart())) {
return 0;
@@ -1910,23 +1910,22 @@ static ui64 TryMovePortions(TVector<TPortionInfo>& portions,
ui64 numRows = 0;
ui32 counter = 0;
for (auto* portionInfo : compacted) {
- TMark ts = ts0;
- if (counter) {
- ts = TMark(portionInfo->PkStart());
- }
-
ui32 rows = portionInfo->NumRows();
Y_VERIFY(rows);
numRows += rows;
- tsIds.emplace(ts, counter + 1);
+ tsIds.emplace_back((counter ? TMark(portionInfo->PkStart()) : ts0), counter + 1);
toMove.emplace_back(std::move(*portionInfo), counter);
++counter;
+ // Ensure that std::move will take an effect.
+ static_assert(std::swappable<decltype(*portionInfo)>);
}
std::vector<TPortionInfo> out;
out.reserve(inserted.size());
for (auto* portionInfo : inserted) {
out.emplace_back(std::move(*portionInfo));
+ // Ensure that std::move will take an effect.
+ static_assert(std::swappable<decltype(*portionInfo)>);
}
portions.swap(out);
@@ -1934,15 +1933,15 @@ static ui64 TryMovePortions(TVector<TPortionInfo>& portions,
}
static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
- std::shared_ptr<TColumnEngineForLogs::TChanges> changes) {
- ui64 pathId = changes->SrcGranule->PathId;
- //ui64 granule = changes->SrcGranule->Granule;
- TMark ts0 = changes->SrcGranule->Mark;
+ const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes)
+{
+ const ui64 pathId = changes->SrcGranule->PathId;
+ const TMark ts0 = changes->SrcGranule->Mark;
TVector<TPortionInfo>& portions = changes->SwitchedPortions;
- TMap<TMark, ui64> tsIds;
- ui64 movedRows = TryMovePortions(portions, tsIds, changes->PortionsToMove, ts0);
- auto srcBatches = PortionsToBatches(indexInfo, portions, changes->Blobs, (bool)movedRows);
+ std::vector<std::pair<TMark, ui64>> tsIds;
+ ui64 movedRows = TryMovePortions(ts0, portions, tsIds, changes->PortionsToMove);
+ auto srcBatches = PortionsToBatches(indexInfo, portions, changes->Blobs, movedRows != 0);
Y_VERIFY(srcBatches.size() == portions.size());
TVector<TString> blobs;
@@ -1952,6 +1951,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
Y_VERIFY(changes->PortionsToMove.size() == tsIds.size());
Y_VERIFY(tsIds.begin()->first == ts0);
+ // Calculate total number of rows.
ui64 numRows = movedRows;
for (auto& batch : srcBatches) {
numRows += batch->num_rows();
@@ -1960,31 +1960,34 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
// Recalculate new granules borders (if they are larger then portions)
ui32 numSplitInto = changes->NumSplitInto(numRows);
if (numSplitInto < tsIds.size()) {
- ui32 rowsInGranule = numRows / numSplitInto;
+ const ui32 rowsInGranule = numRows / numSplitInto;
Y_VERIFY(rowsInGranule);
- TMap<TMark, ui64> newTsIds;
- auto lastTs = tsIds.rbegin()->first;
+ TVector<std::pair<TMark, ui64>> newTsIds;
ui32 tmpGranule = 0;
ui32 sumRows = 0;
- ui32 i = 0;
- for (auto& [ts, _] : tsIds) {
- if (sumRows >= rowsInGranule || (ts == lastTs && newTsIds.empty())) {
+ // Always insert mark of the source granule at the beginning.
+ newTsIds.emplace_back(ts0, 1);
+
+ for (size_t i = 0, end = tsIds.size(); i != end; ++i) {
+ const TMark& ts = tsIds[i].first;
+ // Make new granule if the current number of rows is exceeded the allowed number of rows in the granule
+ // or there is the end of the ids and nothing was inserted so far.
+ if (sumRows >= rowsInGranule || (i + 1 == end && newTsIds.size() == 1)) {
++tmpGranule;
- newTsIds.emplace(ts, tmpGranule + 1);
+ newTsIds.emplace_back(ts, tmpGranule + 1);
sumRows = 0;
}
auto& toMove = changes->PortionsToMove[i];
sumRows += toMove.first.NumRows();
toMove.second = tmpGranule;
- ++i;
}
- newTsIds[ts0] = 1;
tsIds.swap(newTsIds);
}
Y_VERIFY(tsIds.size() > 1);
+ Y_VERIFY(tsIds[0] == std::make_pair(ts0, ui64(1)));
TColumnEngineForLogs::TMarksGranules marksGranules(std::move(tsIds));
// Slice inserted portions with granules' borders
@@ -2028,7 +2031,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) {
ui64 tmpGranule = changes->SetTmpGranule(pathId, mark);
- for (auto& batch : idBatches[id]) {
+ for (const auto& batch : idBatches[id]) {
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
auto newPortions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs);
Y_VERIFY(newPortions.size() > 0);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 18fe886043e..71eca591be5 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -97,16 +97,10 @@ public:
using TPair = std::pair<TMark, ui64>;
TMarksGranules() = default;
+ TMarksGranules(std::vector<TPair>&& marks) noexcept;
TMarksGranules(std::vector<TMark>&& points);
TMarksGranules(const TSelectInfo& selectInfo);
- TMarksGranules(TMap<TMark, ui64>&& marks) {
- Marks.reserve(marks.size());
- for (auto&& [m, granule] : marks) {
- Marks.emplace_back(std::move(m), granule);
- }
- }
-
const std::vector<TPair>& GetOrderedMarks() const noexcept {
return Marks;
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index fc9912cc738..8069a7bdb90 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -1,4 +1,5 @@
#pragma once
+
#include "defs.h"
#include "columns_table.h"
#include "index_info.h"
@@ -268,4 +269,10 @@ struct TPortionInfo {
}
};
-}
+/// Ensure that TPortionInfo can be effectively assigned by moving the value.
+static_assert(std::is_nothrow_move_assignable<TPortionInfo>::value);
+
+/// Ensure that TPortionInfo can be effectively constructed by moving the value.
+static_assert(std::is_nothrow_move_constructible<TPortionInfo>::value);
+
+} // namespace NKikimr::NOlap