aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-05-17 17:31:41 +0300
committerArtem Zuikov <chertus@gmail.com>2022-05-17 17:31:41 +0300
commitf219f68442e33d822331e6f3828c359dbd3dc4cb (patch)
tree677fb9d01883b13e94bfd05dac64b49d5ab76e7e
parent2fe816aa3f450f6177e3918196218d468ffc3354 (diff)
downloadydb-f219f68442e33d822331e6f3828c359dbd3dc4cb.tar.gz
KIKIMR-11746: faster TColumnEngineForLogs::StartInsert()
ref:75f81fa87929a6d6c092da506d00b0fbb9b4914b
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp63
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h7
2 files changed, 49 insertions, 21 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index a0c83abdf9..57417038b0 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -317,11 +317,10 @@ TVector<const TPortionInfo*> GetActualPortions(const THashMap<ui64, TPortionInfo
return out;
}
-}
-
-THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules,
- const TIndexInfo& indexInfo) {
+template <typename T>
+inline THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranulesImpl(
+ const std::shared_ptr<arrow::RecordBatch>& batch, const T& tsGranules, const TIndexInfo& indexInfo)
+{
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out;
if (tsGranules.size() == 1) {
@@ -362,6 +361,21 @@ SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<u
return out;
}
+}
+
+THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
+SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules,
+ const TIndexInfo& indexInfo)
+{
+ return SliceIntoGranulesImpl(batch, tsGranules, indexInfo);
+}
+
+THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
+SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<ui64, ui64>>& tsGranules,
+ const TIndexInfo& indexInfo)
+{
+ return SliceIntoGranulesImpl(batch, tsGranules, indexInfo);
+}
TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits)
: IndexInfo(info)
@@ -625,14 +639,24 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<
for (auto& data : changes->DataToIndex) {
ui64 pathId = data.PathId;
+ if (changes->PathToGranule.count(pathId)) {
+ continue;
+ }
+
if (PathGranules.count(pathId)) {
if (PathsGranulesOverloaded.count(pathId)) {
return {};
}
- // FIXME: Copying all granules of a huge table might be heavy
- changes->PathToGranule[pathId] = PathGranules[pathId];
+ // TODO: cache PathToGranule for hot pathIds
+ const auto& src = PathGranules[pathId];
+ auto& dst = changes->PathToGranule[pathId];
+ dst.reserve(src.size());
+ for (const auto& [ts, granule] : src) {
+ dst.emplace_back(std::make_pair(ts, granule));
+ }
} else {
+ // It could reserve more then needed in case of the same pathId in DataToIndex
++reserveGranules;
}
}
@@ -678,7 +702,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
ui64 pathId = spg->Record.PathId;
Y_VERIFY(PathGranules.count(pathId));
- for (auto& [ts, pathGranule] : PathGranules[pathId]) {
+ for (const auto& [ts, pathGranule] : PathGranules[pathId]) {
if (pathGranule == granule) {
changes->SrcGranule = {pathId, granule, ts};
break;
@@ -710,8 +734,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
if (!PathGranules.count(pathId)) {
continue;
}
- auto& pathGranules = PathGranules[pathId];
- for (auto& [_, granule]: pathGranules) {
+
+ for (const auto& [_, granule]: PathGranules[pathId]) {
Y_VERIFY(Granules.count(granule));
auto spg = Granules[granule];
Y_VERIFY(spg);
@@ -767,8 +791,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
Y_VERIFY(!ttl.TierBorders.empty());
ui32 ttlColumnId = IndexInfo.GetColumnId(ttl.Column);
- auto& tsGranule = PathGranules[pathId];
- for (auto [ts, granule] : tsGranule) {
+ for (const auto& [ts, granule] : PathGranules[pathId]) {
auto spg = Granules[granule];
Y_VERIFY(spg);
@@ -807,11 +830,11 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
TVector<TVector<std::pair<ui64, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks(ui64 pathId) const {
Y_VERIFY(PathGranules.count(pathId));
- auto& pathGranules = PathGranules.find(pathId)->second;
+ const auto& pathGranules = PathGranules.find(pathId)->second;
TVector<TVector<std::pair<ui64, ui64>>> emptyGranules;
ui64 emptyStart = 0;
- for (auto& [ts, granule]: pathGranules) {
+ for (const auto& [ts, granule]: pathGranules) {
Y_VERIFY(Granules.count(granule));
auto spg = Granules.find(granule)->second;
Y_VERIFY(spg);
@@ -1188,8 +1211,7 @@ bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) {
return true;
}
- auto& pathGranules = PathGranules[rec.PathId];
- pathGranules.emplace(ts, rec.Granule);
+ PathGranules[rec.PathId].emplace(ts, rec.Granule);
auto& spg = Granules[rec.Granule];
Y_VERIFY(!spg);
spg = std::make_shared<TGranuleMeta>(rec);
@@ -1281,10 +1303,13 @@ bool TColumnEngineForLogs::CanInsert(const TChanges& changes, const TSnapshot& c
}
}
// Does insert have already splitted granule?
- for (auto& [pathId, map] : changes.PathToGranule) {
+ for (const auto& [pathId, tsGranules] : changes.PathToGranule) {
if (PathGranules.count(pathId)) {
- if (PathGranules.find(pathId)->second.size() != map.size()) {
- LOG_S_DEBUG("Cannot insert: splitted granules at tablet " << TabletId);
+ const auto& actualGranules = PathGranules.find(pathId)->second;
+ size_t expectedSize = tsGranules.size();
+ if (actualGranules.size() != expectedSize) {
+ LOG_S_DEBUG("Cannot insert into splitted granules (actual: " << actualGranules.size()
+ << ", expected: " << expectedSize << ") at tablet " << TabletId);
return false;
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index a7faeb2bd2..7a4f443ed3 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -66,7 +66,7 @@ public:
--ReservedGranuleIds;
ui64 ts = 0;
NewGranules.emplace(granule, std::make_pair(pathId, ts));
- PathToGranule[pathId].emplace(ts, granule);
+ PathToGranule[pathId].emplace_back(ts, granule);
return true;
}
@@ -105,7 +105,7 @@ public:
return numSplitInto;
}
- THashMap<ui64, TMap<ui64, ui64>> PathToGranule; // pathId -> {timestamp, granule}
+ THashMap<ui64, std::vector<std::pair<ui64, ui64>>> PathToGranule; // pathId -> {timestamp, granule}
TSrcGranule SrcGranule;
THashMap<ui64, std::pair<ui64, ui64>> NewGranules; // granule -> {pathId, key}
THashMap<ui64, ui32> TmpGranuleIds; // ts -> tmp granule id
@@ -261,5 +261,8 @@ std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TIndexInfo& inde
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules,
const TIndexInfo& indexInfo);
+THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
+SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<ui64, ui64>>& tsGranules,
+ const TIndexInfo& indexInfo);
}