diff options
author | stanly <stanly@yandex-team.com> | 2023-05-10 15:18:17 +0300 |
---|---|---|
committer | stanly <stanly@yandex-team.com> | 2023-05-10 15:18:17 +0300 |
commit | 5d0638d8bfa8a45e6cb9a5e2d224aa44edcd0d31 (patch) | |
tree | 063fa54e2d2fd4a16184e66555c5729a2dc9bffb | |
parent | 3ac9d23ff8e6758cb41d0d0c2d74699e34d74104 (diff) | |
download | ydb-5d0638d8bfa8a45e6cb9a5e2d224aa44edcd0d31.tar.gz |
cleanup compaction code
7 files changed, 78 insertions, 106 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 7a8f1f6e91c..cd030c61870 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -121,7 +121,7 @@ public: virtual ~TColumnEngineChanges() = default; - TColumnEngineChanges(EType type) + explicit TColumnEngineChanges(EType type) : Type(type) {} diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 654504c61cb..352c441bde4 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -1,8 +1,7 @@ #include "column_engine_logs.h" -#include "indexed_read_data.h" -#include "index_logic_logs.h" - #include "filter.h" +#include "index_logic_logs.h" +#include "indexed_read_data.h" #include <ydb/core/formats/arrow/one_batch_input_stream.h> #include <ydb/core/formats/arrow/merging_sorted_input_stream.h> @@ -467,7 +466,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<TInsertedData>&& dataToIndex) { Y_VERIFY(dataToIndex.size()); - auto changes = std::make_shared<TChanges>(*this, std::move(dataToIndex), Limits); + auto changes = std::make_shared<TChanges>(TMark(MarkSchema), std::move(dataToIndex), Limits); ui32 reserveGranules = 0; changes->InitSnapshot = LastSnapshot; @@ -487,13 +486,9 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector< // TODO: cache PathToGranule for hot pathIds const auto& src = PathGranules[pathId]; - auto& dst = changes->PathToGranule[pathId]; - dst.reserve(src.size()); - for (const auto& [ts, granule] : src) { - dst.emplace_back(ts, granule); - } + changes->PathToGranule[pathId].assign(src.begin(), src.end()); } else { - // It could reserve more then needed in case of the same pathId in DataToIndex + // It could reserve more than needed in case of the same pathId in DataToIndex ++reserveGranules; } } @@ -510,35 +505,26 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector< std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info, const TSnapshot& outdatedSnapshot) { Y_VERIFY(info); + Y_VERIFY(info->Granules.size() == 1); - auto changes = std::make_shared<TChanges>(*this, std::move(info), Limits); - changes->InitSnapshot = LastSnapshot; - - Y_VERIFY(changes->CompactionInfo->Granules.size() == 1); + auto changes = std::make_shared<TChanges>(TMark(MarkSchema), std::move(info), Limits, LastSnapshot); - ui64 granule = *changes->CompactionInfo->Granules.begin(); - auto& portions = changes->SwitchedPortions; - { - auto spg = Granules.find(granule)->second; - Y_VERIFY(spg); - - auto actualPortions = GetActualPortions(spg->Portions); - Y_VERIFY(!actualPortions.empty()); - portions.reserve(actualPortions.size()); + const ui64 granule = *changes->CompactionInfo->Granules.begin(); + const auto gi = Granules.find(granule); + // Check granule exists. + Y_VERIFY(gi != Granules.end()); - for (auto* portionInfo : actualPortions) { - Y_VERIFY(!portionInfo->Empty()); - Y_VERIFY(portionInfo->Granule() == granule); - portions.push_back(*portionInfo); + changes->SwitchedPortions.reserve(gi->second->Portions.size()); + // Collect active portions for the granule. + for (const auto& [_, portionInfo] : gi->second->Portions) { + if (portionInfo.IsActive()) { + changes->SwitchedPortions.push_back(portionInfo); } } - Y_VERIFY(Granules.contains(granule)); - auto& spg = Granules[granule]; - Y_VERIFY(spg); - ui64 pathId = spg->Record.PathId; + const ui64 pathId = gi->second->Record.PathId; Y_VERIFY(PathGranules.contains(pathId)); - + // Locate mark for the granule. for (const auto& [mark, pathGranule] : PathGranules[pathId]) { if (pathGranule == granule) { changes->SrcGranule = TChanges::TSrcGranule(pathId, granule, mark); @@ -548,22 +534,22 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: Y_VERIFY(changes->SrcGranule); if (changes->CompactionInfo->InGranule) { - TSnapshot completedSnap = (outdatedSnapshot < LastSnapshot) ? LastSnapshot : outdatedSnapshot; - if (!InitInGranuleMerge(changes->SrcGranule->Mark, portions, Limits, completedSnap, changes->MergeBorders)) { + const TSnapshot completedSnap = std::max(LastSnapshot, outdatedSnapshot); + if (!InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, Limits, completedSnap, changes->MergeBorders)) { return {}; } } else { GranulesInSplit.insert(granule); } - Y_VERIFY(!portions.empty()); + Y_VERIFY(!changes->SwitchedPortions.empty()); return changes; } std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, ui32 maxRecords) { - auto changes = std::make_shared<TChanges>(*this, snapshot, Limits); + auto changes = std::make_shared<TChanges>(TMark(MarkSchema), snapshot, Limits); ui32 affectedRecords = 0; // Add all portions from dropped paths @@ -650,7 +636,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash } TSnapshot fakeSnapshot(1, 1); // TODO: better snapshot - auto changes = std::make_shared<TChanges>(*this, TColumnEngineChanges::TTL, fakeSnapshot); + auto changes = std::make_shared<TChanges>(TMark(MarkSchema), TColumnEngineChanges::TTL, fakeSnapshot); ui64 evicttionSize = 0; bool allowEviction = true; ui64 dropBlobs = 0; @@ -1312,26 +1298,19 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot return out; } -static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompactionLimits& limits, ui32& inserted) { +static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompactionLimits& limits, bool& inserted) { if (actual.size() < 2) { return false; } - inserted = 0; ui64 sumSize = 0; ui64 sumMaxSize = 0; - std::shared_ptr<arrow::Scalar> minPk0; - std::shared_ptr<arrow::Scalar> maxPk0; - if (actual.size()) { - actual[0]->MinMaxValue(actual[0]->FirstPkColumn, minPk0, maxPk0); - } + auto [minPk0, maxPk0] = actual[0]->MinMaxValue(actual[0]->FirstPkColumn); bool pkEqual = !!minPk0 && !!maxPk0 && arrow::ScalarEquals(*minPk0, *maxPk0); for (auto* portionInfo : actual) { Y_VERIFY(portionInfo); if (pkEqual) { - std::shared_ptr<arrow::Scalar> minPkCurrent; - std::shared_ptr<arrow::Scalar> maxPkCurrent; - portionInfo->MinMaxValue(portionInfo->FirstPkColumn, minPkCurrent, maxPkCurrent); + auto [minPkCurrent, maxPkCurrent] = portionInfo->MinMaxValue(portionInfo->FirstPkColumn); pkEqual = !!minPkCurrent && !!maxPkCurrent && arrow::ScalarEquals(*minPk0, *minPkCurrent) && arrow::ScalarEquals(*maxPk0, *maxPkCurrent); } @@ -1339,12 +1318,11 @@ static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompact sumSize += sizes.first; sumMaxSize += sizes.second; if (portionInfo->IsInserted()) { - ++inserted; + inserted = true; } } - return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize - || sumSize >= limits.GranuleOverloadSize); + return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize); } std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) { @@ -1352,50 +1330,43 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact return {}; } - std::optional<ui64> outGranule; + ui64 granule = 0; bool inGranule = true; - auto it = CompactionGranules.upper_bound(lastCompactedGranule); - if (it == CompactionGranules.end()) { - it = CompactionGranules.begin(); - } while (!CompactionGranules.empty()) { - Y_VERIFY(it != CompactionGranules.end()); - ui64 granule = *it; - Y_VERIFY(Granules.contains(granule)); - auto spg = Granules.find(granule)->second; - Y_VERIFY(spg); + // Start from the beggining if the end is reached. + if (it == CompactionGranules.end()) { + it = CompactionGranules.begin(); + } + const auto gi = Granules.find(*it); + // Check granule exists. + Y_VERIFY(gi != Granules.end()); // We need only actual portions here (with empty XPlanStep:XTxId) - auto actualPortions = GetActualPortions(spg->Portions); - if (!actualPortions.empty()) { - ui32 inserted = 0; - bool needSplit = NeedSplit(actualPortions, Limits, inserted); - if (needSplit) { + if (const auto& actualPortions = GetActualPortions(gi->second->Portions); !actualPortions.empty()) { + bool inserted = false; + if (NeedSplit(actualPortions, Limits, inserted)) { inGranule = false; - outGranule = granule; + granule = *it; break; } else if (inserted) { - outGranule = granule; + granule = *it; break; } } - + // Nothing to compact in the current granule. Throw it. it = CompactionGranules.erase(it); - if (it == CompactionGranules.end()) { - it = CompactionGranules.begin(); - } } - if (outGranule) { + if (granule) { auto info = std::make_unique<TCompactionInfo>(); - info->Granules.insert(*outGranule); + info->Granules.insert(granule); info->InGranule = inGranule; - lastCompactedGranule = *outGranule; + lastCompactedGranule = granule; return info; } return {}; } -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 352206ed7b8..374b044e011 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -60,37 +60,38 @@ public: {} }; - TChanges(const TColumnEngineForLogs& engine, + TChanges(const TMark& defaultMark, TVector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits) : TColumnEngineChanges(TColumnEngineChanges::INSERT) - , DefaultMark(engine.GetDefaultMark()) + , DefaultMark(defaultMark) { Limits = limits; DataToIndex = std::move(blobsToIndex); } - TChanges(const TColumnEngineForLogs& engine, - std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits) + TChanges(const TMark& defaultMark, + std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits, const TSnapshot& snapshot) : TColumnEngineChanges(TColumnEngineChanges::COMPACTION) - , DefaultMark(engine.GetDefaultMark()) + , DefaultMark(defaultMark) { Limits = limits; CompactionInfo = std::move(info); + InitSnapshot = snapshot; } - TChanges(const TColumnEngineForLogs& engine, + TChanges(const TMark& defaultMark, const TSnapshot& snapshot, const TCompactionLimits& limits) : TColumnEngineChanges(TColumnEngineChanges::CLEANUP) - , DefaultMark(engine.GetDefaultMark()) + , DefaultMark(defaultMark) { Limits = limits; InitSnapshot = snapshot; } - TChanges(const TColumnEngineForLogs& engine, + TChanges(const TMark& defaultMark, TColumnEngineChanges::EType type, const TSnapshot& applySnapshot) : TColumnEngineChanges(type) - , DefaultMark(engine.GetDefaultMark()) + , DefaultMark(defaultMark) { ApplySnapshot = applySnapshot; } @@ -142,7 +143,7 @@ public: return std::max<ui32>(2, numSplitInto); } - TMark DefaultMark; + const TMark DefaultMark; THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule} std::optional<TSrcGranule> SrcGranule; THashMap<ui64, std::pair<ui64, TMark>> NewGranules; // granule -> {pathId, key} @@ -197,11 +198,14 @@ public: return TMark::Deserialize(key, MarkSchema); } - TMark GetDefaultMark() const { - return TMark(MarkSchema); - } + const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override; + const TColumnEngineStats& GetTotalStats() override; + ui64 MemoryUsage() const override; + TSnapshot LastUpdate() const override { return LastSnapshot; } +public: bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override; + std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) override; std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TSnapshot& outdatedSnapshot) override; @@ -209,15 +213,16 @@ public: ui32 maxRecords) override; std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema, ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override; + bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) override; + void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) override; + void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override; void UpdateCompactionLimits(const TCompactionLimits& limits) override { Limits = limits; } - const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override; - const TColumnEngineStats& GetTotalStats() override; - ui64 MemoryUsage() const override; - TSnapshot LastUpdate() const override { return LastSnapshot; } + + std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, const THashSet<ui32>& columnIds, @@ -259,6 +264,7 @@ private: ui64 LastGranule; TSnapshot LastSnapshot = TSnapshot::Zero(); +private: void ClearIndex() { Granules.clear(); PathGranules.clear(); @@ -298,4 +304,4 @@ private: TVector<TVector<std::pair<TMark, ui64>>> EmptyGranuleTracks(const ui64 pathId) const; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 664d9bc3a54..5ac90c67303 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -162,14 +162,12 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord } } -void TPortionInfo::MinMaxValue(const ui32 columnId, std::shared_ptr<arrow::Scalar>& minValue, std::shared_ptr<arrow::Scalar>& maxValue) const { +std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> TPortionInfo::MinMaxValue(const ui32 columnId) const { auto it = Meta.ColumnMeta.find(columnId); if (it == Meta.ColumnMeta.end()) { - minValue = nullptr; - maxValue = nullptr; + return std::make_tuple(std::shared_ptr<arrow::Scalar>(), std::shared_ptr<arrow::Scalar>()); } else { - minValue = it->second.Min; - maxValue = it->second.Max; + return std::make_tuple(it->second.Min, it->second.Max); } } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index ec970378229..6f00a50dc1e 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -211,7 +211,8 @@ struct TPortionInfo { void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName); void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); - void MinMaxValue(const ui32 columnId, std::shared_ptr<arrow::Scalar>& minValue, std::shared_ptr<arrow::Scalar>& maxValue) const; + + std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> MinMaxValue(const ui32 columnId) const; std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp index acd16bea9a8..69a6389bdd9 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp @@ -44,9 +44,7 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo bool matchFrom = false; bool matchTo = false; for (auto&& c : indexInfo.GetReplaceKey()->field_names()) { - std::shared_ptr<arrow::Scalar> minValue; - std::shared_ptr<arrow::Scalar> maxValue; - info.MinMaxValue(indexInfo.GetColumnId(c), minValue, maxValue); + const auto& [minValue, maxValue ] = info.MinMaxValue(indexInfo.GetColumnId(c)); if (!matchFrom) { const int result = PredicateFrom.MatchScalar(idx, maxValue); if (result < 0) { diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 912ce96f24f..1f320b18fae 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -184,9 +184,7 @@ void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std:: std::vector<std::shared_ptr<arrow::Scalar>> maxRecord; for (auto&& i : indexInfo.GetReplaceKey()->fields()) { const ui32 columnId = indexInfo.GetColumnId(i->name()); - std::shared_ptr<arrow::Scalar> minScalar; - std::shared_ptr<arrow::Scalar> maxScalar; - PortionInfo->MinMaxValue(columnId, minScalar, maxScalar); + const auto& [minScalar, maxScalar] = PortionInfo->MinMaxValue(columnId); if (!FirstPK && !minScalar) { FirstPK = nullptr; ReverseLastPK = nullptr; |