aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-23 19:09:30 +0300
committerchertus <azuikov@ydb.tech>2023-03-23 19:09:30 +0300
commitb0c73be20527d0e9658d4fff0bfaed641b5c5f28 (patch)
tree978540ae74420fc8eeea8b8af636388dec4e493a
parent9651b8fa0ffec22fd8adc9c5418593c1bd6d6778 (diff)
downloadydb-b0c73be20527d0e9658d4fff0bfaed641b5c5f28.tar.gz
limit portions to drop in StartCleanup()
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp3
-rw-r--r--ydb/core/tx/columnshard/defs.h1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp26
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h4
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h2
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp2
7 files changed, 30 insertions, 12 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 1c0c1c936a..bda32dc0c0 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -937,7 +937,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};
- auto changes = PrimaryIndex->StartCleanup(cleanupSnapshot, PathsToDrop);
+ auto changes = PrimaryIndex->StartCleanup(cleanupSnapshot, PathsToDrop, TLimits::MAX_TX_RECORDS);
if (!changes) {
LOG_S_NOTICE("Cannot prepare cleanup at tablet " << TabletID());
return {};
@@ -947,7 +947,6 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
Y_VERIFY(changes->DataToIndex.empty());
Y_VERIFY(changes->AppendedPortions.empty());
- // TODO: limit PortionsToDrop total size. Delete them in small portions.
// Filter PortionsToDrop
TVector<NOlap::TPortionInfo> portionsCanBedropped;
THashSet<ui64> excludedPortions;
diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h
index b8193e2992..633d7ca2e9 100644
--- a/ydb/core/tx/columnshard/defs.h
+++ b/ydb/core/tx/columnshard/defs.h
@@ -16,6 +16,7 @@ struct TLimits {
static constexpr const ui32 MIN_SMALL_BLOBS_TO_INSERT = 200;
static constexpr const ui32 MIN_BYTES_TO_INSERT = 4 * 1024 * 1024;
static constexpr const ui64 MAX_BYTES_TO_INSERT = 16 * 1024 * 1024;
+ static constexpr const ui32 MAX_TX_RECORDS = 100000;
TControlWrapper MinInsertBytes;
TControlWrapper MaxInsertBytes;
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index b321e0d70a..4a4aecd7e9 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -322,8 +322,8 @@ public:
virtual std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo,
const TSnapshot& outdatedSnapshot) = 0;
- virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
- THashSet<ui64>& pathsToDrop) = 0;
+ virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop,
+ ui32 maxRecords) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0;
virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 170991ba2a..0af8f029ce 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -749,14 +749,17 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
}
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
- THashSet<ui64>& pathsToDrop) {
+ THashSet<ui64>& pathsToDrop,
+ ui32 maxRecords) {
auto changes = std::make_shared<TChanges>(*this, snapshot, Limits);
+ ui32 affectedRecords = 0;
// Add all portions from dropped paths
THashSet<ui64> dropPortions;
- THashSet<ui64> activePathsToDrop;
+ THashSet<ui64> emptyPaths;
for (ui64 pathId : pathsToDrop) {
if (!PathGranules.count(pathId)) {
+ emptyPaths.insert(pathId);
continue;
}
@@ -765,13 +768,23 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
auto spg = Granules[granule];
Y_VERIFY(spg);
for (auto& [portion, info] : spg->Portions) {
+ affectedRecords += info.NumRecords();
changes->PortionsToDrop.push_back(info);
dropPortions.insert(portion);
- activePathsToDrop.insert(pathId);
}
}
+
+ if (affectedRecords > maxRecords) {
+ break;
+ }
+ }
+ for (ui64 pathId : emptyPaths) {
+ pathsToDrop.erase(pathId);
+ }
+
+ if (affectedRecords > maxRecords) {
+ return changes;
}
- pathsToDrop.swap(activePathsToDrop);
// Add stale portions of alive paths
THashSet<ui64> activeCleanupGranules;
@@ -786,10 +799,15 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
if (!info.IsActive()) {
activeCleanupGranules.insert(granule);
if (info.XSnapshot() < snapshot) {
+ affectedRecords += info.NumRecords();
changes->PortionsToDrop.push_back(info);
}
}
}
+
+ if (affectedRecords > maxRecords) {
+ break;
+ }
}
CleanupGranules.swap(activeCleanupGranules);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index bda24a647c..2bd7b86dda 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -252,8 +252,8 @@ public:
std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) override;
std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo,
const TSnapshot& outdatedSnapshot) override;
- std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
- THashSet<ui64>& pathsToDrop) override;
+ std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop,
+ ui32 maxRecords) override;
std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 15428e1157..a37d5ea4ac 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -55,7 +55,7 @@ struct TPortionInfo {
bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; }
bool CanHaveDups() const { return !Valid(); /* || IsInserted(); */ }
bool CanIntersectOthers() const { return !Valid() || IsInserted(); }
- ui32 NumRecords() const { return Records.size(); }
+ size_t NumRecords() const { return Records.size(); }
bool EvictReady(size_t hotSize) const {
return Meta.Produced == TPortionMeta::COMPACTED
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 7f46c0539a..2171084b2c 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -336,7 +336,7 @@ bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, TH
bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, ui32 expectedToDrop) {
THashSet<ui64> pathsToDrop;
- std::shared_ptr<TColumnEngineChanges> changes = engine.StartCleanup(snap, pathsToDrop);
+ std::shared_ptr<TColumnEngineChanges> changes = engine.StartCleanup(snap, pathsToDrop, 1000);
UNIT_ASSERT(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop);