summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <[email protected]>2023-03-29 22:56:38 +0300
committerchertus <[email protected]>2023-03-29 22:56:38 +0300
commit7c9ee77626609670679db7c62bde1cf66775d9d1 (patch)
treeb69f347c8fbee999d87b209828efc9e89f115b87
parentb657fafc4f4b9aaa42d9edb7f15fb287d2722b3d (diff)
better limit portions to drop in StartCleanup()
-rw-r--r--ydb/core/tx/columnshard/blob_cache.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp44
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h9
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp8
4 files changed, 35 insertions, 38 deletions
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp
index 4928f2110ff..250cdf1719b 100644
--- a/ydb/core/tx/columnshard/blob_cache.cpp
+++ b/ydb/core/tx/columnshard/blob_cache.cpp
@@ -161,7 +161,7 @@ public:
icb->RegisterSharedControl(MaxInFlightDataSize, "BlobCache.MaxInFlightDataSize");
icb->RegisterSharedControl(MaxFallbackDataSize, "BlobCache.MaxFallbackDataSize");
- LOG_S_DEBUG("MaxCacheDataSize: " << (i64)MaxCacheDataSize
+ LOG_S_NOTICE("MaxCacheDataSize: " << (i64)MaxCacheDataSize
<< " MaxFallbackDataSize: " << (i64)MaxFallbackDataSize
<< " InFlightDataSize: " << (i64)InFlightDataSize);
@@ -296,7 +296,7 @@ private:
void Handle(TEvBlobCache::TEvForgetBlob::TPtr& ev, const TActorContext&) {
const TUnifiedBlobId& blobId = ev->Get()->BlobId;
- LOG_S_DEBUG("Forgetting blob: " << blobId);
+ LOG_S_INFO("Forgetting blob: " << blobId);
Forgets->Inc();
@@ -551,7 +551,7 @@ private:
Y_VERIFY(!blobRanges.empty());
ui64 tabletId = blobRanges.front().BlobId.GetTabletId();
- LOG_S_DEBUG("Sending read from Tablet: " << tabletId
+ LOG_S_INFO("Sending read from Tablet: " << tabletId
<< " ranges: " << JoinStrings(blobRanges.begin(), blobRanges.end(), " ")
<< " cookie: " << cookie);
@@ -584,7 +584,7 @@ private:
auto cookieIt = CookieToRange.find(readCookie);
if (cookieIt == CookieToRange.end()) {
// This might only happen in case fo race between response and pipe close
- LOG_S_INFO("Unknown read result cookie: " << readCookie);
+ LOG_S_NOTICE("Unknown read result cookie: " << readCookie);
return;
}
@@ -624,12 +624,12 @@ private:
const auto& record = ev->Get()->Record;
ui64 tabletId = record.GetTabletId();
ui64 readCookie = ev->Cookie;
- LOG_S_DEBUG("Got read result from tablet: " << tabletId);
+ LOG_S_INFO("Got read result from tablet: " << tabletId);
auto cookieIt = CookieToRange.find(readCookie);
if (cookieIt == CookieToRange.end()) {
// This might only happen in case fo race between response and pipe close
- LOG_S_INFO("Unknown read result cookie: " << readCookie);
+ LOG_S_NOTICE("Unknown read result cookie: " << readCookie);
return;
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index d08cb1d695f..2dbe5127f3e 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -772,6 +772,10 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
changes->PortionsToDrop.push_back(info);
dropPortions.insert(portion);
}
+
+ if (affectedRecords > maxRecords) {
+ break;
+ }
}
if (affectedRecords > maxRecords) {
@@ -788,22 +792,30 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
}
// Add stale portions of alive paths
- THashSet<ui64> activeCleanupGranules;
+ THashSet<ui64> cleanGranules;
for (ui64 granule : CleanupGranules) {
auto spg = Granules.find(granule)->second;
Y_VERIFY(spg);
+
+ bool isClean = true;
for (auto& [portion, info] : spg->Portions) {
- if (dropPortions.count(portion)) {
+ if (info.IsActive() || dropPortions.count(portion)) {
continue;
}
- if (!info.IsActive()) {
- activeCleanupGranules.insert(granule);
- if (info.XSnapshot() < snapshot) {
- affectedRecords += info.NumRecords();
- changes->PortionsToDrop.push_back(info);
- }
+ isClean = false;
+ if (info.XSnapshot() < snapshot) {
+ affectedRecords += info.NumRecords();
+ changes->PortionsToDrop.push_back(info);
}
+
+ if (affectedRecords > maxRecords) {
+ break;
+ }
+ }
+
+ if (isClean) {
+ cleanGranules.insert(granule);
}
if (affectedRecords > maxRecords) {
@@ -811,7 +823,10 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
break;
}
}
- CleanupGranules.swap(activeCleanupGranules);
+
+ for (ui64 granule : cleanGranules) {
+ CleanupGranules.erase(granule);
+ }
return changes;
}
@@ -1011,15 +1026,6 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
for (auto& portionInfo : changes->SwitchedPortions) {
CleanupGranules.insert(portionInfo.Granule());
}
- } else if (changes->IsCleanup()) {
- for (auto& portionInfo : changes->PortionsToDrop) {
- ui64 granule = portionInfo.Granule();
- auto& meta = Granules[granule];
- Y_VERIFY(meta);
- if (meta->AllActive()) {
- CleanupGranules.erase(granule);
- }
- }
}
// Update overloaded granules (only if tx would be applyed)
@@ -1370,7 +1376,7 @@ bool TColumnEngineForLogs::CanInsert(const TChanges& changes, const TSnapshot& c
Y_VERIFY(!portionInfo.Empty());
ui64 granule = portionInfo.Granule();
if (GranulesInSplit.count(granule)) {
- LOG_S_DEBUG("Cannot insert into splitting granule " << granule << " at tablet " << TabletId);
+ LOG_S_NOTICE("Cannot insert into splitting granule " << granule << " at tablet " << TabletId);
return false;
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 292b4b2db2a..e408c9283e5 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -294,15 +294,6 @@ private:
ui64 PathId() const { return Record.PathId; }
bool Empty() const { return Portions.empty(); }
-
- bool AllActive() const {
- for (auto& [_, portionInfo] : Portions) {
- if (!portionInfo.IsActive()) {
- return false;
- }
- }
- return true;
- }
};
TIndexInfo IndexInfo;
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index df318d99a44..f5b808b00ce 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2366,7 +2366,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
- runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_INFO);
TActorId sender = runtime.AllocateEdgeActor();
CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
@@ -2505,7 +2505,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 txId = 1000;
// Ovewrite the same data multiple times to produce multiple portions at different timestamps
- ui32 numWrites = 14; // trigger split granule compaction by GranuleBlobSplitSize
+ ui32 numWrites = 14;
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
@@ -2570,7 +2570,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Advance the time in order to trigger GC
TDuration delay = TDuration::Minutes(6);
planStep += delay.MilliSeconds();
- numWrites = 10; // trigger in granule compaction by size
+ numWrites = 10;
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
@@ -2601,7 +2601,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Advance the time and trigger some more compactions and cleanups
planStep += 2*delay.MilliSeconds();
- numWrites = 10;
+ numWrites = 2;
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));