diff options
author | stanly <stanly@yandex-team.com> | 2023-05-22 15:18:17 +0300 |
---|---|---|
committer | stanly <stanly@yandex-team.com> | 2023-05-22 15:18:17 +0300 |
commit | 3fc6096f7bf293fd20e719251af09c25833f7f3e (patch) | |
tree | c2cb5ce9f223a69a6a5c5ed9e5cb289b0185906a | |
parent | 9953df7707289bff2073c94102c705023c44df65 (diff) | |
download | ydb-3fc6096f7bf293fd20e719251af09c25833f7f3e.tar.gz |
always use actual compaction limits
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 24 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.h | 12 |
7 files changed, 48 insertions, 50 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 3540dd6103b..22564ded182 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -737,7 +737,7 @@ bool TColumnShard::SetupIndexation() { } Y_VERIFY(data.size()); - auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data)); + auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(CompactionLimits.Get(), std::move(data)); if (!indexChanges) { LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID()); return false; @@ -759,8 +759,8 @@ bool TColumnShard::SetupCompaction() { std::vector<std::unique_ptr<TEvPrivate::TEvCompaction>> events; while (ActiveCompaction < TSettings::MAX_ACTIVE_COMPACTIONS) { - TablesManager.MutablePrimaryIndex().UpdateCompactionLimits(CompactionLimits.Get()); - auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(LastCompactedGranule); + auto limits = CompactionLimits.Get(); + auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(limits, LastCompactedGranule); if (!compactionInfo || compactionInfo->Empty()) { if (events.empty()) { LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID()); @@ -773,7 +773,7 @@ bool TColumnShard::SetupCompaction() { LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); ui64 outdatedStep = GetOutdatedStep(); - auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), NOlap::TSnapshot(outdatedStep, 0)); + auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), NOlap::TSnapshot(outdatedStep, 0), limits); if (!indexChanges) { if (events.empty()) { LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID()); @@ -859,7 +859,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0}; - auto changes = TablesManager.StartIndexCleanup(cleanupSnapshot, TLimits::MAX_TX_RECORDS); + auto changes = TablesManager.StartIndexCleanup(cleanupSnapshot, CompactionLimits.Get(), TLimits::MAX_TX_RECORDS); if (!changes) { LOG_S_NOTICE("Cannot prepare cleanup at tablet " << TabletID()); return {}; diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index eb5e925588b..5669cf59061 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -156,8 +156,9 @@ public: virtual ~TColumnEngineChanges() = default; - explicit TColumnEngineChanges(EType type) + TColumnEngineChanges(const EType type, const TCompactionLimits& limits) : Type(type) + , Limits(limits) {} void SetBlobs(THashMap<TBlobRange, TString>&& blobs) { @@ -165,7 +166,7 @@ public: Blobs = std::move(blobs); } - EType Type{UNSPECIFIED}; + EType Type; TCompactionLimits Limits; TSnapshot InitSnapshot = TSnapshot::Zero(); TSnapshot ApplySnapshot = TSnapshot::Zero(); @@ -427,11 +428,11 @@ public: virtual std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, const THashSet<ui32>& columnIds, const TPKRangesFilter& pkRangesFilter) const = 0; - virtual std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) = 0; - virtual std::shared_ptr<TColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) = 0; + virtual std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) = 0; + virtual std::shared_ptr<TColumnEngineChanges> StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, - const TSnapshot& outdatedSnapshot) = 0; - virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, + const TSnapshot& outdatedSnapshot, const TCompactionLimits& limits) = 0; + virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop, ui32 maxRecords) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema, ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0; @@ -439,7 +440,6 @@ public: virtual void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) = 0; virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0; //virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO - virtual void UpdateCompactionLimits(const TCompactionLimits& limits) = 0; virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0; virtual const TColumnEngineStats& GetTotalStats() = 0; virtual ui64 MemoryUsage() const { return 0; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index d2102e74b1f..bfad74d8dea 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -393,7 +393,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl } } - UpdateOverloaded(Granules); + UpdateOverloaded(Granules, Limits); Y_VERIFY(!(LastPortion >> 63), "near to int overflow"); Y_VERIFY(!(LastGranule >> 63), "near to int overflow"); @@ -451,10 +451,10 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { return CountersTable->Load(db, callback); } -std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vector<TInsertedData>&& dataToIndex) { +std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) { Y_VERIFY(dataToIndex.size()); - auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(dataToIndex), Limits); + auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(dataToIndex), limits); ui32 reserveGranules = 0; changes->InitSnapshot = LastSnapshot; @@ -491,11 +491,12 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vec } std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info, - const TSnapshot& outdatedSnapshot) { + const TSnapshot& outdatedSnapshot, + const TCompactionLimits& limits) { Y_VERIFY(info); Y_VERIFY(info->Granules.size() == 1); - auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(info), Limits, LastSnapshot); + auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(info), limits, LastSnapshot); const ui64 granule = *changes->CompactionInfo->Granules.begin(); const auto gi = Granules.find(granule); @@ -523,7 +524,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: if (changes->CompactionInfo->InGranule) { const TSnapshot completedSnap = std::max(LastSnapshot, outdatedSnapshot); - if (!InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, Limits, completedSnap, changes->MergeBorders)) { + if (!InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, limits, completedSnap, changes->MergeBorders)) { // Return granule to Compation list. This is equal to single compaction worker behaviour. CompactionGranules.insert(granule); return {}; @@ -537,9 +538,10 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: } std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, + const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop, ui32 maxRecords) { - auto changes = std::make_shared<TChanges>(DefaultMark(), snapshot, Limits); + auto changes = std::make_shared<TChanges>(DefaultMark(), snapshot, limits); ui32 affectedRecords = 0; // Add all portions from dropped paths @@ -727,7 +729,7 @@ std::vector<std::vector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGran return emptyGranules; } -void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) { +void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const TCompactionLimits& limits) { for (const auto& [granule, spg] : granules) { const ui64 pathId = spg->Record.PathId; @@ -740,7 +742,7 @@ void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr } // Size exceeds the configured limit. Mark granule as overloaded. - if (size >= Limits.GranuleOverloadSize) { + if (size >= limits.GranuleOverloadSize) { PathsGranulesOverloaded.emplace(pathId, granule); } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { // Size is under limit. Remove granule from the overloaded set. @@ -853,7 +855,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE } } - UpdateOverloaded(granules); + UpdateOverloaded(granules, indexChanges->Limits); } return true; } @@ -1348,7 +1350,7 @@ static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompa return differentBorders && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize); } -std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) { +std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) { if (CompactionGranules.empty()) { return {}; } @@ -1367,7 +1369,7 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact Y_VERIFY(gi != Granules.end()); bool inserted = false; - if (NeedSplit(gi->second->Portions, Limits, inserted)) { + if (NeedSplit(gi->second->Portions, limits, inserted)) { inGranule = false; granule = *it; CompactionGranules.erase(it); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 5b8cdf91733..93de430962e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -62,35 +62,32 @@ public: TChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits) - : TColumnEngineChanges(TColumnEngineChanges::INSERT) + : TColumnEngineChanges(TColumnEngineChanges::INSERT, limits) , DefaultMark(defaultMark) { - Limits = limits; DataToIndex = std::move(blobsToIndex); } TChanges(const TMark& defaultMark, std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits, const TSnapshot& snapshot) - : TColumnEngineChanges(TColumnEngineChanges::COMPACTION) + : TColumnEngineChanges(TColumnEngineChanges::COMPACTION, limits) , DefaultMark(defaultMark) { - Limits = limits; CompactionInfo = std::move(info); InitSnapshot = snapshot; } TChanges(const TMark& defaultMark, const TSnapshot& snapshot, const TCompactionLimits& limits) - : TColumnEngineChanges(TColumnEngineChanges::CLEANUP) + : TColumnEngineChanges(TColumnEngineChanges::CLEANUP, limits) , DefaultMark(defaultMark) { - Limits = limits; InitSnapshot = snapshot; } TChanges(const TMark& defaultMark, TColumnEngineChanges::EType type, const TSnapshot& applySnapshot) - : TColumnEngineChanges(type) + : TColumnEngineChanges(type, TCompactionLimits()) , DefaultMark(defaultMark) { ApplySnapshot = applySnapshot; @@ -214,10 +211,10 @@ public: public: bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override; - std::shared_ptr<TColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) override; + std::shared_ptr<TColumnEngineChanges> StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) override; std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, - const TSnapshot& outdatedSnapshot) override; - std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, + const TSnapshot& outdatedSnapshot, const TCompactionLimits& limits) override; + std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop, ui32 maxRecords) override; std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema, ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override; @@ -228,14 +225,13 @@ public: void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) override; void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override; - void UpdateCompactionLimits(const TCompactionLimits& limits) override { Limits = limits; } std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, const THashSet<ui32>& columnIds, const TPKRangesFilter& pkRangesFilter) const override; - std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) override; + std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits, ui64& lastCompactedGranule) override; private: using TMarksMap = std::map<TMark, ui64, TMark::TCompare>; @@ -253,7 +249,7 @@ private: }; TVersionedIndex VersionedIndex; - TCompactionLimits Limits; + const TCompactionLimits Limits; ui64 TabletId; std::shared_ptr<TGranulesTable> GranulesTable; std::shared_ptr<TColumnsTable> ColumnsTable; @@ -322,7 +318,7 @@ private: EStatsUpdateType updateType) const; bool CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const; - void UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules); + void UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const TCompactionLimits& limits); /// Return lists of adjacent empty granules for the path. std::vector<std::vector<std::pair<TMark, ui64>>> EmptyGranuleTracks(const ui64 pathId) const; diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 48d007f2a34..0803566198b 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -273,7 +273,7 @@ TCompactionLimits TestLimits() { bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, std::vector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) { - std::shared_ptr<TColumnEngineChanges> changes = engine.StartInsert(std::move(dataToIndex)); + std::shared_ptr<TColumnEngineChanges> changes = engine.StartInsert(TestLimits(), std::move(dataToIndex)); if (!changes) { return false; } @@ -308,11 +308,11 @@ struct TExpected { bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step, const TExpected& expected) { ui64 lastCompactedGranule = 0; - auto compactionInfo = engine.Compact(lastCompactedGranule); + auto compactionInfo = engine.Compact(TestLimits(), lastCompactedGranule); UNIT_ASSERT_VALUES_EQUAL(compactionInfo->Granules.size(), 1); UNIT_ASSERT(!compactionInfo->InGranule); - std::shared_ptr<TColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TSnapshot::Zero()); + std::shared_ptr<TColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TSnapshot::Zero(), TestLimits()); UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions); changes->SetBlobs(std::move(blobs)); @@ -338,7 +338,7 @@ bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, TH bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, ui32 expectedToDrop) { THashSet<ui64> pathsToDrop; - std::shared_ptr<TColumnEngineChanges> changes = engine.StartCleanup(snap, pathsToDrop, 1000); + std::shared_ptr<TColumnEngineChanges> changes = engine.StartCleanup(snap, TestLimits(), pathsToDrop, 1000); UNIT_ASSERT(changes); UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop); diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 74fc0c73660..f462ecbfad9 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -283,9 +283,9 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim } } -std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) { +std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, const NOlap::TCompactionLimits& limits, ui32 maxRecords) { Y_VERIFY(PrimaryIndex); - return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords); + return PrimaryIndex->StartCleanup(snapshot, limits, PathsToDrop, maxRecords); } NOlap::TIndexInfo TTablesManager::DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 987c258dbcd..afc4181a1b6 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -60,7 +60,7 @@ public: bool IsStandaloneTable() const { return Id == 0; } - + const TString& GetName() const { return Name; } @@ -180,17 +180,17 @@ public: const std::unique_ptr<NOlap::IColumnEngine>& GetPrimaryIndex() const { return PrimaryIndex; } - + const NOlap::IColumnEngine& GetPrimaryIndexSafe() const { Y_VERIFY(!!PrimaryIndex); return *PrimaryIndex; } - + bool InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId); bool LoadIndex(NOlap::TDbWrapper& db, THashSet<NOlap::TUnifiedBlobId>& lostEvictions); void Clear(); - + const TTableInfo& GetTable(const ui64 pathId) const; ui64 GetMemoryUsage() const; @@ -206,10 +206,10 @@ public: void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db); void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); - + void OnTtlUpdate(); - std::shared_ptr<NOlap::TColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords); + std::shared_ptr<NOlap::TColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, const NOlap::TCompactionLimits& limits, ui32 maxRecords); private: void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema); |