aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungasc <kungasc@yandex-team.com>2023-06-30 13:04:38 +0300
committerkungasc <kungasc@yandex-team.com>2023-06-30 13:04:38 +0300
commitfc83a97aeeb5f273651b72e2c8973b4e976e6352 (patch)
treeeb3948518ce7f0e442c0c0934a712b56c437cfe0
parent1962b16a839f1b905bba4a0606c1f352cfb12d59 (diff)
downloadydb-fc83a97aeeb5f273651b72e2c8973b4e976e6352.tar.gz
Limit shared cache with passive pages
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp4
-rw-r--r--ydb/core/protos/shared_cache.proto1
-rw-r--r--ydb/core/tablet_flat/shared_sausagecache.cpp58
-rw-r--r--ydb/core/tablet_flat/shared_sausagecache.h5
-rw-r--r--ydb/core/tablet_flat/test/libs/exec/runner.h10
-rw-r--r--ydb/core/testlib/basics/services.cpp4
-rw-r--r--ydb/core/util/cache_cache.h74
7 files changed, 108 insertions, 48 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 51eba3cae4d..83f765ed441 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -1387,7 +1387,7 @@ TSharedCacheInitializer::TSharedCacheInitializer(const TKikimrRunConfig& runConf
void TSharedCacheInitializer::InitializeServices(
NActors::TActorSystemSetup* setup,
const NKikimr::TAppData* appData) {
- auto config = MakeIntrusive<TSharedPageCacheConfig>();
+ auto config = MakeHolder<TSharedPageCacheConfig>();
NKikimrSharedCache::TSharedCacheConfig cfg;
if (Config.HasBootstrapConfig() && Config.GetBootstrapConfig().HasSharedCacheConfig()) {
@@ -1410,7 +1410,7 @@ void TSharedCacheInitializer::InitializeServices(
config->Counters = new TSharedPageCacheCounters(sausageGroup);
setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(MakeSharedPageCacheId(0),
- TActorSetupCmd(CreateSharedPageCache(config.Get()), TMailboxType::ReadAsFilled, appData->UserPoolId)));
+ TActorSetupCmd(CreateSharedPageCache(std::move(config)), TMailboxType::ReadAsFilled, appData->UserPoolId)));
auto *configurator = NConsole::CreateSharedCacheConfigurator();
setup->LocalServices.emplace_back(TActorId(),
diff --git a/ydb/core/protos/shared_cache.proto b/ydb/core/protos/shared_cache.proto
index bcfa8d0593b..8cca85c0f90 100644
--- a/ydb/core/protos/shared_cache.proto
+++ b/ydb/core/protos/shared_cache.proto
@@ -5,4 +5,5 @@ message TSharedCacheConfig {
optional uint64 MemoryLimit = 1 [default = 536870912];
optional uint64 ScanQueueInFlyLimit = 2 [default = 536870912];
optional uint64 AsyncQueueInFlyLimit = 3 [default = 536870912];
+ optional uint32 ActivePagesReservationPercent = 4;
}
diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp
index 2ee4e51b567..f168297644e 100644
--- a/ydb/core/tablet_flat/shared_sausagecache.cpp
+++ b/ydb/core/tablet_flat/shared_sausagecache.cpp
@@ -40,7 +40,7 @@ static bool Satisfies(NLog::EPriority priority = NLog::PRI_DEBUG) {
}
class TSharedPageCache : public TActor<TSharedPageCache> {
- TIntrusiveConstPtr<TSharedPageCacheConfig> Config;
+ THolder<TSharedPageCacheConfig> Config;
using ELnLev = NUtil::ELnLev;
using TBlocks = TVector<NSharedCache::TEvResult::TLoaded>;
@@ -176,9 +176,25 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
ui64 StatHitBytes = 0;
ui64 StatMissPages = 0;
ui64 StatMissBytes = 0;
+ ui64 StatPassiveBytes = 0;
bool GCScheduled = false;
+ void ActualizeCacheSizeLimit() {
+ if ((ui64)SizeOverride != Config->CacheConfig->Limit) {
+ Config->CacheConfig->SetLimit(SizeOverride);
+ }
+
+ ui64 limit = Config->CacheConfig->Limit;
+ limit = Max<ui64>(1,
+ limit * Config->ActivePagesReservationPercent / 100,
+ limit > StatPassiveBytes ? limit - StatPassiveBytes : 0);
+
+ Cache.UpdateCacheSize(limit);
+
+ Evict(Cache.EnsureLimits());
+ }
+
void Registered(TActorSystem *sys, const TActorId &owner)
{
Owner = owner;
@@ -231,10 +247,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
void Handle(NSharedCache::TEvRequest::TPtr &ev) {
- if ((ui64)SizeOverride != Config->CacheConfig->Limit) {
- Config->CacheConfig->SetLimit(SizeOverride);
- Cache.UpdateCacheSize(Config->CacheConfig->Limit);
- }
+ ActualizeCacheSizeLimit();
NSharedCache::TEvRequest *msg = ev->Get();
const auto &pageCollection = *msg->Fetch->PageCollection;
@@ -530,6 +543,8 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
void Handle(NSharedCache::TEvTouch::TPtr &ev) {
+ ActualizeCacheSizeLimit();
+
NSharedCache::TEvTouch *msg = ev->Get();
THashMap<TLogoBlobID, NSharedCache::TEvUpdated::TActions> actions;
for (auto &xpair : msg->Touched) {
@@ -611,7 +626,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
if (ownerIt != CollectionsOwners.end()) {
for (auto* collection : ownerIt->second.Collections) {
collection->Owners.erase(ev->Sender);
- DropExpiredCollection(collection);
+ TryDropExpiredCollection(collection);
}
CollectionsOwners.erase(ownerIt);
}
@@ -654,6 +669,8 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
void Handle(NBlockIO::TEvData::TPtr &ev) {
+ ActualizeCacheSizeLimit();
+
auto *msg = ev->Get();
if (TRequestQueue *queue = (TRequestQueue *)ev->Cookie) {
@@ -694,7 +711,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
ProcessGCList();
}
- void DropExpiredCollection(TCollection* collection) {
+ void TryDropExpiredCollection(TCollection* collection) {
if (!collection->Owners &&
!collection->Expectants &&
collection->PageMap.used() == 0)
@@ -720,9 +737,10 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
// We have successfully dropped the page
// We are guaranteed no new uses for this page are possible
Y_VERIFY(page->State == PageStateEvicted);
- // It may be the case that collection has been dropped
+ RemovePassivePage(page);
+
+ Y_VERIFY_DEBUG_S(page->Collection, "Evicted pages are expected to have collection");
if (auto* collection = page->Collection) {
- RemovePassivePage(page);
Y_VERIFY_DEBUG(collection->PageMap[page->PageId].Get() == page);
Y_VERIFY(collection->PageMap.erase(page->PageId));
if (collection->Owners) {
@@ -758,7 +776,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
collection->DroppedPages.clear();
}
- DropExpiredCollection(collection);
+ TryDropExpiredCollection(collection);
}
for (auto& kv : toSend) {
@@ -962,8 +980,12 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
if (msg->Record.GetMemoryLimit() != 0) {
Config->CacheConfig->SetLimit(msg->Record.GetMemoryLimit());
- Cache.UpdateCacheSize(Config->CacheConfig->Limit);
SizeOverride = Config->CacheConfig->Limit;
+ // limit will be updated with ActualizeCacheSizeLimit call
+ }
+
+ if (msg->Record.HasActivePagesReservationPercent()) {
+ Config->ActivePagesReservationPercent = msg->Record.GetActivePagesReservationPercent();
}
if (msg->Record.GetAsyncQueueInFlyLimit() != 0) {
@@ -985,6 +1007,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
inline void AddPassivePage(const TPage* page) {
+ StatPassiveBytes += sizeof(TPage) + page->Size;
if (Config->Counters) {
++*Config->Counters->PassivePages;
*Config->Counters->PassiveBytes += sizeof(TPage) + page->Size;
@@ -999,6 +1022,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
inline void RemovePassivePage(const TPage* page) {
+ StatPassiveBytes -= sizeof(TPage) + page->Size;
if (Config->Counters) {
--*Config->Counters->PassivePages;
*Config->Counters->PassiveBytes -= sizeof(TPage) + page->Size;
@@ -1006,11 +1030,11 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
public:
- TSharedPageCache(TSharedPageCacheConfig *config)
+ TSharedPageCache(THolder<TSharedPageCacheConfig> config)
: TActor(&TThis::StateFunc)
- , Config(config)
- , Cache(*config->CacheConfig)
- , SizeOverride(config->CacheConfig->Limit, 1, Max<i64>())
+ , Config(std::move(config))
+ , Cache(*Config->CacheConfig)
+ , SizeOverride(Config->CacheConfig->Limit, 1, Max<i64>())
{
AsyncRequests.Limit = Config->TotalAsyncQueueInFlyLimit;
ScanRequests.Limit = Config->TotalScanQueueInFlyLimit;
@@ -1037,8 +1061,8 @@ public:
} // NTabletFlatExecutor
-IActor* CreateSharedPageCache(TSharedPageCacheConfig *config) {
- return new NTabletFlatExecutor::TSharedPageCache(config);
+IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config) {
+ return new NTabletFlatExecutor::TSharedPageCache(std::move(config));
}
}
diff --git a/ydb/core/tablet_flat/shared_sausagecache.h b/ydb/core/tablet_flat/shared_sausagecache.h
index 51d0682e38e..cf5e2f1c140 100644
--- a/ydb/core/tablet_flat/shared_sausagecache.h
+++ b/ydb/core/tablet_flat/shared_sausagecache.h
@@ -39,15 +39,16 @@ struct TSharedPageCacheCounters final : public TAtomicRefCount<TSharedPageCacheC
explicit TSharedPageCacheCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters> &group);
};
-struct TSharedPageCacheConfig final : public TAtomicRefCount<TSharedPageCacheConfig> {
+struct TSharedPageCacheConfig {
TIntrusivePtr<TCacheCacheConfig> CacheConfig;
ui64 TotalScanQueueInFlyLimit = 512 * 1024 * 1024;
ui64 TotalAsyncQueueInFlyLimit = 512 * 1024 * 1024;
TString CacheName = "SharedPageCache";
TIntrusivePtr<TSharedPageCacheCounters> Counters;
+ ui32 ActivePagesReservationPercent = 50;
};
-IActor* CreateSharedPageCache(TSharedPageCacheConfig *config);
+IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config);
inline TActorId MakeSharedPageCacheId(ui64 id = 0) {
char x[12] = { 's', 'h', 's', 'c' };
diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h
index 3b58ab814b4..847e7f80429 100644
--- a/ydb/core/tablet_flat/test/libs/exec/runner.h
+++ b/ydb/core/tablet_flat/test/libs/exec/runner.h
@@ -178,13 +178,13 @@ namespace NFake {
}
{ /*_ Shared page collection cache service, used by executor */
- auto egg = MakeIntrusive<TSharedPageCacheConfig>();
+ auto config = MakeHolder<TSharedPageCacheConfig>();
- egg->CacheConfig = new TCacheCacheConfig(conf.Shared, nullptr, nullptr, nullptr);
- egg->TotalAsyncQueueInFlyLimit = conf.AsyncQueue;
- egg->TotalScanQueueInFlyLimit = conf.ScanQueue;
+ config->CacheConfig = new TCacheCacheConfig(conf.Shared, nullptr, nullptr, nullptr);
+ config->TotalAsyncQueueInFlyLimit = conf.AsyncQueue;
+ config->TotalScanQueueInFlyLimit = conf.ScanQueue;
- auto *actor = CreateSharedPageCache(egg.Get());
+ auto *actor = CreateSharedPageCache(std::move(config));
RunOn(3, MakeSharedPageCacheId(0), actor, EMail::ReadAsFilled);
}
diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp
index 7cbff912a73..130446df025 100644
--- a/ydb/core/testlib/basics/services.cpp
+++ b/ydb/core/testlib/basics/services.cpp
@@ -129,14 +129,14 @@ namespace NPDisk {
void SetupSharedPageCache(TTestActorRuntime& runtime, ui32 nodeIndex, NFake::TCaches caches)
{
- auto pageCollectionCacheConfig = MakeIntrusive<TSharedPageCacheConfig>();
+ auto pageCollectionCacheConfig = MakeHolder<TSharedPageCacheConfig>();
pageCollectionCacheConfig->CacheConfig = new TCacheCacheConfig(caches.Shared, nullptr, nullptr, nullptr);
pageCollectionCacheConfig->TotalAsyncQueueInFlyLimit = caches.AsyncQueue;
pageCollectionCacheConfig->TotalScanQueueInFlyLimit = caches.ScanQueue;
runtime.AddLocalService(MakeSharedPageCacheId(0),
TActorSetupCmd(
- CreateSharedPageCache(pageCollectionCacheConfig.Get()),
+ CreateSharedPageCache(std::move(pageCollectionCacheConfig)),
TMailboxType::ReadAsFilled,
0),
nodeIndex);
diff --git a/ydb/core/util/cache_cache.h b/ydb/core/util/cache_cache.h
index 850b6b05f5e..939b438f196 100644
--- a/ydb/core/util/cache_cache.h
+++ b/ydb/core/util/cache_cache.h
@@ -80,7 +80,27 @@ public:
, WarmWeight(0)
{}
- TItem* Touch(TItem *item) { // returns evicted elements as list
+ // returns evicted elements as list
+ TItem* EnsureLimits() {
+ TItem *ret = nullptr;
+ ret = LimitFresh(ret);
+ ret = LimitWarm(ret);
+ ret = LimitStaging(ret);
+
+ if (ret) {
+ if (Config.ReportedFresh)
+ *Config.ReportedFresh = FreshWeight;
+ if (Config.ReportedWarm)
+ *Config.ReportedWarm = WarmWeight;
+ if (Config.ReportedStaging)
+ *Config.ReportedStaging = StagingWeight;
+ }
+
+ return ret;
+ }
+
+ // returns evicted elements as list
+ TItem* Touch(TItem *item) {
TIntrusiveListItem<TItem> *xitem = item;
const TCacheCacheConfig::ECacheGeneration cacheGen = GenerationOp.Get(item);
@@ -176,14 +196,7 @@ private:
}
TItem* AddToFresh(TItem *item) {
- TItem *ret = nullptr;
- while (FreshWeight > Config.FreshLimit) {
- Y_VERIFY_DEBUG(!FreshList.Empty());
- TItem *x = FreshList.PopBack();
- Y_VERIFY(GenerationOp.Get(x) == TCacheCacheConfig::CacheGenFresh, "malformed entry in fresh cache. %" PRIu32, (ui32)GenerationOp.Get(x));
- Unlink(x, FreshWeight);
- ret = AddToStaging(x, ret);
- }
+ TItem *ret = LimitFresh(nullptr);
item->Unlink();
FreshWeight += WeightOp.Get(item);
FreshList.PushFront(item);
@@ -198,15 +211,8 @@ private:
}
TItem* MoveToWarm(TItem *item) {
- TItem *ret = nullptr;
+ TItem *ret = LimitWarm(nullptr);
Unlink(item, StagingWeight);
- while (WarmWeight > Config.WarmLimit) {
- Y_VERIFY_DEBUG(!WarmList.Empty());
- TItem *x = WarmList.PopBack();
- Y_VERIFY(GenerationOp.Get(x) == TCacheCacheConfig::CacheGenWarm, "malformed entry in warm cache. %" PRIu32, (ui32)GenerationOp.Get(x));
- Unlink(x, WarmWeight);
- ret = AddToStaging(x, ret);
- }
WarmWeight += WeightOp.Get(item);
WarmList.PushFront(item);
GenerationOp.Set(item, TCacheCacheConfig::CacheGenWarm);
@@ -220,6 +226,36 @@ private:
}
TItem* AddToStaging(TItem *item, TItem *ret) {
+ ret = LimitStaging(ret);
+ StagingWeight += WeightOp.Get(item);
+ StagingList.PushFront(item);
+ GenerationOp.Set(item, TCacheCacheConfig::CacheGenStaging);
+ return ret;
+ }
+
+ TItem* LimitFresh(TItem *ret) {
+ while (FreshWeight > Config.FreshLimit) {
+ Y_VERIFY_DEBUG(!FreshList.Empty());
+ TItem *x = FreshList.PopBack();
+ Y_VERIFY(GenerationOp.Get(x) == TCacheCacheConfig::CacheGenFresh, "malformed entry in fresh cache. %" PRIu32, (ui32)GenerationOp.Get(x));
+ Unlink(x, FreshWeight);
+ ret = AddToStaging(x, ret);
+ }
+ return ret;
+ }
+
+ TItem* LimitWarm(TItem *ret) {
+ while (WarmWeight > Config.WarmLimit) {
+ Y_VERIFY_DEBUG(!WarmList.Empty());
+ TItem *x = WarmList.PopBack();
+ Y_VERIFY(GenerationOp.Get(x) == TCacheCacheConfig::CacheGenWarm, "malformed entry in warm cache. %" PRIu32, (ui32)GenerationOp.Get(x));
+ Unlink(x, WarmWeight);
+ ret = AddToStaging(x, ret);
+ }
+ return ret;
+ }
+
+ TItem* LimitStaging(TItem *ret) {
while (StagingWeight > Config.StagingLimit) {
Y_VERIFY_DEBUG(!StagingList.Empty());
TItem *evicted = StagingList.PopBack();
@@ -231,11 +267,9 @@ private:
else
evicted->LinkBefore(ret);
}
- StagingWeight += WeightOp.Get(item);
- StagingList.PushFront(item);
- GenerationOp.Set(item, TCacheCacheConfig::CacheGenStaging);
return ret;
}
+
private:
TCacheCacheConfig Config;