diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-05-17 17:31:41 +0300 |
---|---|---|
committer | Artem Zuikov <chertus@gmail.com> | 2022-05-17 17:31:41 +0300 |
commit | f219f68442e33d822331e6f3828c359dbd3dc4cb (patch) | |
tree | 677fb9d01883b13e94bfd05dac64b49d5ab76e7e | |
parent | 2fe816aa3f450f6177e3918196218d468ffc3354 (diff) | |
download | ydb-f219f68442e33d822331e6f3828c359dbd3dc4cb.tar.gz |
KIKIMR-11746: faster TColumnEngineForLogs::StartInsert()
ref:75f81fa87929a6d6c092da506d00b0fbb9b4914b
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 63 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 7 |
2 files changed, 49 insertions, 21 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index a0c83abdf9..57417038b0 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -317,11 +317,10 @@ TVector<const TPortionInfo*> GetActualPortions(const THashMap<ui64, TPortionInfo return out; } -} - -THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> -SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules, - const TIndexInfo& indexInfo) { +template <typename T> +inline THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranulesImpl( + const std::shared_ptr<arrow::RecordBatch>& batch, const T& tsGranules, const TIndexInfo& indexInfo) +{ THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out; if (tsGranules.size() == 1) { @@ -362,6 +361,21 @@ SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<u return out; } +} + +THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> +SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules, + const TIndexInfo& indexInfo) +{ + return SliceIntoGranulesImpl(batch, tsGranules, indexInfo); +} + +THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> +SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<ui64, ui64>>& tsGranules, + const TIndexInfo& indexInfo) +{ + return SliceIntoGranulesImpl(batch, tsGranules, indexInfo); +} TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits) : IndexInfo(info) @@ -625,14 +639,24 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector< for (auto& data : changes->DataToIndex) { ui64 pathId = data.PathId; + if (changes->PathToGranule.count(pathId)) { + continue; + } + if (PathGranules.count(pathId)) { if (PathsGranulesOverloaded.count(pathId)) { return {}; } - // FIXME: Copying all granules of a huge table might be heavy - changes->PathToGranule[pathId] = PathGranules[pathId]; + // TODO: cache PathToGranule for hot pathIds + const auto& src = PathGranules[pathId]; + auto& dst = changes->PathToGranule[pathId]; + dst.reserve(src.size()); + for (const auto& [ts, granule] : src) { + dst.emplace_back(std::make_pair(ts, granule)); + } } else { + // It could reserve more then needed in case of the same pathId in DataToIndex ++reserveGranules; } } @@ -678,7 +702,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: ui64 pathId = spg->Record.PathId; Y_VERIFY(PathGranules.count(pathId)); - for (auto& [ts, pathGranule] : PathGranules[pathId]) { + for (const auto& [ts, pathGranule] : PathGranules[pathId]) { if (pathGranule == granule) { changes->SrcGranule = {pathId, granule, ts}; break; @@ -710,8 +734,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T if (!PathGranules.count(pathId)) { continue; } - auto& pathGranules = PathGranules[pathId]; - for (auto& [_, granule]: pathGranules) { + + for (const auto& [_, granule]: PathGranules[pathId]) { Y_VERIFY(Granules.count(granule)); auto spg = Granules[granule]; Y_VERIFY(spg); @@ -767,8 +791,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash Y_VERIFY(!ttl.TierBorders.empty()); ui32 ttlColumnId = IndexInfo.GetColumnId(ttl.Column); - auto& tsGranule = PathGranules[pathId]; - for (auto [ts, granule] : tsGranule) { + for (const auto& [ts, granule] : PathGranules[pathId]) { auto spg = Granules[granule]; Y_VERIFY(spg); @@ -807,11 +830,11 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash TVector<TVector<std::pair<ui64, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks(ui64 pathId) const { Y_VERIFY(PathGranules.count(pathId)); - auto& pathGranules = PathGranules.find(pathId)->second; + const auto& pathGranules = PathGranules.find(pathId)->second; TVector<TVector<std::pair<ui64, ui64>>> emptyGranules; ui64 emptyStart = 0; - for (auto& [ts, granule]: pathGranules) { + for (const auto& [ts, granule]: pathGranules) { Y_VERIFY(Granules.count(granule)); auto spg = Granules.find(granule)->second; Y_VERIFY(spg); @@ -1188,8 +1211,7 @@ bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) { return true; } - auto& pathGranules = PathGranules[rec.PathId]; - pathGranules.emplace(ts, rec.Granule); + PathGranules[rec.PathId].emplace(ts, rec.Granule); auto& spg = Granules[rec.Granule]; Y_VERIFY(!spg); spg = std::make_shared<TGranuleMeta>(rec); @@ -1281,10 +1303,13 @@ bool TColumnEngineForLogs::CanInsert(const TChanges& changes, const TSnapshot& c } } // Does insert have already splitted granule? - for (auto& [pathId, map] : changes.PathToGranule) { + for (const auto& [pathId, tsGranules] : changes.PathToGranule) { if (PathGranules.count(pathId)) { - if (PathGranules.find(pathId)->second.size() != map.size()) { - LOG_S_DEBUG("Cannot insert: splitted granules at tablet " << TabletId); + const auto& actualGranules = PathGranules.find(pathId)->second; + size_t expectedSize = tsGranules.size(); + if (actualGranules.size() != expectedSize) { + LOG_S_DEBUG("Cannot insert into splitted granules (actual: " << actualGranules.size() + << ", expected: " << expectedSize << ") at tablet " << TabletId); return false; } } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index a7faeb2bd2..7a4f443ed3 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -66,7 +66,7 @@ public: --ReservedGranuleIds; ui64 ts = 0; NewGranules.emplace(granule, std::make_pair(pathId, ts)); - PathToGranule[pathId].emplace(ts, granule); + PathToGranule[pathId].emplace_back(ts, granule); return true; } @@ -105,7 +105,7 @@ public: return numSplitInto; } - THashMap<ui64, TMap<ui64, ui64>> PathToGranule; // pathId -> {timestamp, granule} + THashMap<ui64, std::vector<std::pair<ui64, ui64>>> PathToGranule; // pathId -> {timestamp, granule} TSrcGranule SrcGranule; THashMap<ui64, std::pair<ui64, ui64>> NewGranules; // granule -> {pathId, key} THashMap<ui64, ui32> TmpGranuleIds; // ts -> tmp granule id @@ -261,5 +261,8 @@ std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TIndexInfo& inde THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules, const TIndexInfo& indexInfo); +THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> +SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<ui64, ui64>>& tsGranules, + const TIndexInfo& indexInfo); } |