diff options
author | chertus <[email protected]> | 2023-03-29 22:56:38 +0300 |
---|---|---|
committer | chertus <[email protected]> | 2023-03-29 22:56:38 +0300 |
commit | 7c9ee77626609670679db7c62bde1cf66775d9d1 (patch) | |
tree | b69f347c8fbee999d87b209828efc9e89f115b87 | |
parent | b657fafc4f4b9aaa42d9edb7f15fb287d2722b3d (diff) |
better limit portions to drop in StartCleanup()
4 files changed, 35 insertions, 38 deletions
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp index 4928f2110ff..250cdf1719b 100644 --- a/ydb/core/tx/columnshard/blob_cache.cpp +++ b/ydb/core/tx/columnshard/blob_cache.cpp @@ -161,7 +161,7 @@ public: icb->RegisterSharedControl(MaxInFlightDataSize, "BlobCache.MaxInFlightDataSize"); icb->RegisterSharedControl(MaxFallbackDataSize, "BlobCache.MaxFallbackDataSize"); - LOG_S_DEBUG("MaxCacheDataSize: " << (i64)MaxCacheDataSize + LOG_S_NOTICE("MaxCacheDataSize: " << (i64)MaxCacheDataSize << " MaxFallbackDataSize: " << (i64)MaxFallbackDataSize << " InFlightDataSize: " << (i64)InFlightDataSize); @@ -296,7 +296,7 @@ private: void Handle(TEvBlobCache::TEvForgetBlob::TPtr& ev, const TActorContext&) { const TUnifiedBlobId& blobId = ev->Get()->BlobId; - LOG_S_DEBUG("Forgetting blob: " << blobId); + LOG_S_INFO("Forgetting blob: " << blobId); Forgets->Inc(); @@ -551,7 +551,7 @@ private: Y_VERIFY(!blobRanges.empty()); ui64 tabletId = blobRanges.front().BlobId.GetTabletId(); - LOG_S_DEBUG("Sending read from Tablet: " << tabletId + LOG_S_INFO("Sending read from Tablet: " << tabletId << " ranges: " << JoinStrings(blobRanges.begin(), blobRanges.end(), " ") << " cookie: " << cookie); @@ -584,7 +584,7 @@ private: auto cookieIt = CookieToRange.find(readCookie); if (cookieIt == CookieToRange.end()) { // This might only happen in case fo race between response and pipe close - LOG_S_INFO("Unknown read result cookie: " << readCookie); + LOG_S_NOTICE("Unknown read result cookie: " << readCookie); return; } @@ -624,12 +624,12 @@ private: const auto& record = ev->Get()->Record; ui64 tabletId = record.GetTabletId(); ui64 readCookie = ev->Cookie; - LOG_S_DEBUG("Got read result from tablet: " << tabletId); + LOG_S_INFO("Got read result from tablet: " << tabletId); auto cookieIt = CookieToRange.find(readCookie); if (cookieIt == CookieToRange.end()) { // This might only happen in case fo race between response and pipe close - LOG_S_INFO("Unknown read result cookie: " << readCookie); + LOG_S_NOTICE("Unknown read result cookie: " << readCookie); return; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index d08cb1d695f..2dbe5127f3e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -772,6 +772,10 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T changes->PortionsToDrop.push_back(info); dropPortions.insert(portion); } + + if (affectedRecords > maxRecords) { + break; + } } if (affectedRecords > maxRecords) { @@ -788,22 +792,30 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T } // Add stale portions of alive paths - THashSet<ui64> activeCleanupGranules; + THashSet<ui64> cleanGranules; for (ui64 granule : CleanupGranules) { auto spg = Granules.find(granule)->second; Y_VERIFY(spg); + + bool isClean = true; for (auto& [portion, info] : spg->Portions) { - if (dropPortions.count(portion)) { + if (info.IsActive() || dropPortions.count(portion)) { continue; } - if (!info.IsActive()) { - activeCleanupGranules.insert(granule); - if (info.XSnapshot() < snapshot) { - affectedRecords += info.NumRecords(); - changes->PortionsToDrop.push_back(info); - } + isClean = false; + if (info.XSnapshot() < snapshot) { + affectedRecords += info.NumRecords(); + changes->PortionsToDrop.push_back(info); } + + if (affectedRecords > maxRecords) { + break; + } + } + + if (isClean) { + cleanGranules.insert(granule); } if (affectedRecords > maxRecords) { @@ -811,7 +823,10 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T break; } } - CleanupGranules.swap(activeCleanupGranules); + + for (ui64 granule : cleanGranules) { + CleanupGranules.erase(granule); + } return changes; } @@ -1011,15 +1026,6 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE for (auto& portionInfo : changes->SwitchedPortions) { CleanupGranules.insert(portionInfo.Granule()); } - } else if (changes->IsCleanup()) { - for (auto& portionInfo : changes->PortionsToDrop) { - ui64 granule = portionInfo.Granule(); - auto& meta = Granules[granule]; - Y_VERIFY(meta); - if (meta->AllActive()) { - CleanupGranules.erase(granule); - } - } } // Update overloaded granules (only if tx would be applyed) @@ -1370,7 +1376,7 @@ bool TColumnEngineForLogs::CanInsert(const TChanges& changes, const TSnapshot& c Y_VERIFY(!portionInfo.Empty()); ui64 granule = portionInfo.Granule(); if (GranulesInSplit.count(granule)) { - LOG_S_DEBUG("Cannot insert into splitting granule " << granule << " at tablet " << TabletId); + LOG_S_NOTICE("Cannot insert into splitting granule " << granule << " at tablet " << TabletId); return false; } } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 292b4b2db2a..e408c9283e5 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -294,15 +294,6 @@ private: ui64 PathId() const { return Record.PathId; } bool Empty() const { return Portions.empty(); } - - bool AllActive() const { - for (auto& [_, portionInfo] : Portions) { - if (!portionInfo.IsActive()) { - return false; - } - } - return true; - } }; TIndexInfo IndexInfo; diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index df318d99a44..f5b808b00ce 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2366,7 +2366,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TTestBasicRuntime runtime; TTester::Setup(runtime); - runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_INFO); TActorId sender = runtime.AllocateEdgeActor(); CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); @@ -2505,7 +2505,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 txId = 1000; // Ovewrite the same data multiple times to produce multiple portions at different timestamps - ui32 numWrites = 14; // trigger split granule compaction by GranuleBlobSplitSize + ui32 numWrites = 14; for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); @@ -2570,7 +2570,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Advance the time in order to trigger GC TDuration delay = TDuration::Minutes(6); planStep += delay.MilliSeconds(); - numWrites = 10; // trigger in granule compaction by size + numWrites = 10; for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); @@ -2601,7 +2601,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Advance the time and trigger some more compactions and cleanups planStep += 2*delay.MilliSeconds(); - numWrites = 10; + numWrites = 2; for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); |