diff options
author | chertus <azuikov@ydb.tech> | 2023-03-23 19:09:30 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-23 19:09:30 +0300 |
commit | b0c73be20527d0e9658d4fff0bfaed641b5c5f28 (patch) | |
tree | 978540ae74420fc8eeea8b8af636388dec4e493a | |
parent | 9651b8fa0ffec22fd8adc9c5418593c1bd6d6778 (diff) | |
download | ydb-b0c73be20527d0e9658d4fff0bfaed641b5c5f28.tar.gz |
limit portions to drop in StartCleanup()
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/defs.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 2 |
7 files changed, 30 insertions, 12 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 1c0c1c936a..bda32dc0c0 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -937,7 +937,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0}; - auto changes = PrimaryIndex->StartCleanup(cleanupSnapshot, PathsToDrop); + auto changes = PrimaryIndex->StartCleanup(cleanupSnapshot, PathsToDrop, TLimits::MAX_TX_RECORDS); if (!changes) { LOG_S_NOTICE("Cannot prepare cleanup at tablet " << TabletID()); return {}; @@ -947,7 +947,6 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { Y_VERIFY(changes->DataToIndex.empty()); Y_VERIFY(changes->AppendedPortions.empty()); - // TODO: limit PortionsToDrop total size. Delete them in small portions. // Filter PortionsToDrop TVector<NOlap::TPortionInfo> portionsCanBedropped; THashSet<ui64> excludedPortions; diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h index b8193e2992..633d7ca2e9 100644 --- a/ydb/core/tx/columnshard/defs.h +++ b/ydb/core/tx/columnshard/defs.h @@ -16,6 +16,7 @@ struct TLimits { static constexpr const ui32 MIN_SMALL_BLOBS_TO_INSERT = 200; static constexpr const ui32 MIN_BYTES_TO_INSERT = 4 * 1024 * 1024; static constexpr const ui64 MAX_BYTES_TO_INSERT = 16 * 1024 * 1024; + static constexpr const ui32 MAX_TX_RECORDS = 100000; TControlWrapper MinInsertBytes; TControlWrapper MaxInsertBytes; diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index b321e0d70a..4a4aecd7e9 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -322,8 +322,8 @@ public: virtual std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TSnapshot& outdatedSnapshot) = 0; - virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, - THashSet<ui64>& pathsToDrop) = 0; + virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, + ui32 maxRecords) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0; virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 170991ba2a..0af8f029ce 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -749,14 +749,17 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: } std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, - THashSet<ui64>& pathsToDrop) { + THashSet<ui64>& pathsToDrop, + ui32 maxRecords) { auto changes = std::make_shared<TChanges>(*this, snapshot, Limits); + ui32 affectedRecords = 0; // Add all portions from dropped paths THashSet<ui64> dropPortions; - THashSet<ui64> activePathsToDrop; + THashSet<ui64> emptyPaths; for (ui64 pathId : pathsToDrop) { if (!PathGranules.count(pathId)) { + emptyPaths.insert(pathId); continue; } @@ -765,13 +768,23 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T auto spg = Granules[granule]; Y_VERIFY(spg); for (auto& [portion, info] : spg->Portions) { + affectedRecords += info.NumRecords(); changes->PortionsToDrop.push_back(info); dropPortions.insert(portion); - activePathsToDrop.insert(pathId); } } + + if (affectedRecords > maxRecords) { + break; + } + } + for (ui64 pathId : emptyPaths) { + pathsToDrop.erase(pathId); + } + + if (affectedRecords > maxRecords) { + return changes; } - pathsToDrop.swap(activePathsToDrop); // Add stale portions of alive paths THashSet<ui64> activeCleanupGranules; @@ -786,10 +799,15 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T if (!info.IsActive()) { activeCleanupGranules.insert(granule); if (info.XSnapshot() < snapshot) { + affectedRecords += info.NumRecords(); changes->PortionsToDrop.push_back(info); } } } + + if (affectedRecords > maxRecords) { + break; + } } CleanupGranules.swap(activeCleanupGranules); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index bda24a647c..2bd7b86dda 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -252,8 +252,8 @@ public: std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) override; std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TSnapshot& outdatedSnapshot) override; - std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, - THashSet<ui64>& pathsToDrop) override; + std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, + ui32 maxRecords) override; std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override; bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 15428e1157..a37d5ea4ac 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -55,7 +55,7 @@ struct TPortionInfo { bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; } bool CanHaveDups() const { return !Valid(); /* || IsInserted(); */ } bool CanIntersectOthers() const { return !Valid() || IsInserted(); } - ui32 NumRecords() const { return Records.size(); } + size_t NumRecords() const { return Records.size(); } bool EvictReady(size_t hotSize) const { return Meta.Produced == TPortionMeta::COMPACTED diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 7f46c0539a..2171084b2c 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -336,7 +336,7 @@ bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, TH bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, ui32 expectedToDrop) { THashSet<ui64> pathsToDrop; - std::shared_ptr<TColumnEngineChanges> changes = engine.StartCleanup(snap, pathsToDrop); + std::shared_ptr<TColumnEngineChanges> changes = engine.StartCleanup(snap, pathsToDrop, 1000); UNIT_ASSERT(changes); UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop); |