diff options
author | kungasc <kungasc@yandex-team.com> | 2023-06-30 13:04:38 +0300 |
---|---|---|
committer | kungasc <kungasc@yandex-team.com> | 2023-06-30 13:04:38 +0300 |
commit | fc83a97aeeb5f273651b72e2c8973b4e976e6352 (patch) | |
tree | eb3948518ce7f0e442c0c0934a712b56c437cfe0 | |
parent | 1962b16a839f1b905bba4a0606c1f352cfb12d59 (diff) | |
download | ydb-fc83a97aeeb5f273651b72e2c8973b4e976e6352.tar.gz |
Limit shared cache with passive pages
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/shared_cache.proto | 1 | ||||
-rw-r--r-- | ydb/core/tablet_flat/shared_sausagecache.cpp | 58 | ||||
-rw-r--r-- | ydb/core/tablet_flat/shared_sausagecache.h | 5 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/exec/runner.h | 10 | ||||
-rw-r--r-- | ydb/core/testlib/basics/services.cpp | 4 | ||||
-rw-r--r-- | ydb/core/util/cache_cache.h | 74 |
7 files changed, 108 insertions, 48 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 51eba3cae4d..83f765ed441 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1387,7 +1387,7 @@ TSharedCacheInitializer::TSharedCacheInitializer(const TKikimrRunConfig& runConf void TSharedCacheInitializer::InitializeServices( NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { - auto config = MakeIntrusive<TSharedPageCacheConfig>(); + auto config = MakeHolder<TSharedPageCacheConfig>(); NKikimrSharedCache::TSharedCacheConfig cfg; if (Config.HasBootstrapConfig() && Config.GetBootstrapConfig().HasSharedCacheConfig()) { @@ -1410,7 +1410,7 @@ void TSharedCacheInitializer::InitializeServices( config->Counters = new TSharedPageCacheCounters(sausageGroup); setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(MakeSharedPageCacheId(0), - TActorSetupCmd(CreateSharedPageCache(config.Get()), TMailboxType::ReadAsFilled, appData->UserPoolId))); + TActorSetupCmd(CreateSharedPageCache(std::move(config)), TMailboxType::ReadAsFilled, appData->UserPoolId))); auto *configurator = NConsole::CreateSharedCacheConfigurator(); setup->LocalServices.emplace_back(TActorId(), diff --git a/ydb/core/protos/shared_cache.proto b/ydb/core/protos/shared_cache.proto index bcfa8d0593b..8cca85c0f90 100644 --- a/ydb/core/protos/shared_cache.proto +++ b/ydb/core/protos/shared_cache.proto @@ -5,4 +5,5 @@ message TSharedCacheConfig { optional uint64 MemoryLimit = 1 [default = 536870912]; optional uint64 ScanQueueInFlyLimit = 2 [default = 536870912]; optional uint64 AsyncQueueInFlyLimit = 3 [default = 536870912]; + optional uint32 ActivePagesReservationPercent = 4; } diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp index 2ee4e51b567..f168297644e 100644 --- a/ydb/core/tablet_flat/shared_sausagecache.cpp +++ b/ydb/core/tablet_flat/shared_sausagecache.cpp @@ -40,7 +40,7 @@ static bool Satisfies(NLog::EPriority priority = NLog::PRI_DEBUG) { } class TSharedPageCache : public TActor<TSharedPageCache> { - TIntrusiveConstPtr<TSharedPageCacheConfig> Config; + THolder<TSharedPageCacheConfig> Config; using ELnLev = NUtil::ELnLev; using TBlocks = TVector<NSharedCache::TEvResult::TLoaded>; @@ -176,9 +176,25 @@ class TSharedPageCache : public TActor<TSharedPageCache> { ui64 StatHitBytes = 0; ui64 StatMissPages = 0; ui64 StatMissBytes = 0; + ui64 StatPassiveBytes = 0; bool GCScheduled = false; + void ActualizeCacheSizeLimit() { + if ((ui64)SizeOverride != Config->CacheConfig->Limit) { + Config->CacheConfig->SetLimit(SizeOverride); + } + + ui64 limit = Config->CacheConfig->Limit; + limit = Max<ui64>(1, + limit * Config->ActivePagesReservationPercent / 100, + limit > StatPassiveBytes ? limit - StatPassiveBytes : 0); + + Cache.UpdateCacheSize(limit); + + Evict(Cache.EnsureLimits()); + } + void Registered(TActorSystem *sys, const TActorId &owner) { Owner = owner; @@ -231,10 +247,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } void Handle(NSharedCache::TEvRequest::TPtr &ev) { - if ((ui64)SizeOverride != Config->CacheConfig->Limit) { - Config->CacheConfig->SetLimit(SizeOverride); - Cache.UpdateCacheSize(Config->CacheConfig->Limit); - } + ActualizeCacheSizeLimit(); NSharedCache::TEvRequest *msg = ev->Get(); const auto &pageCollection = *msg->Fetch->PageCollection; @@ -530,6 +543,8 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } void Handle(NSharedCache::TEvTouch::TPtr &ev) { + ActualizeCacheSizeLimit(); + NSharedCache::TEvTouch *msg = ev->Get(); THashMap<TLogoBlobID, NSharedCache::TEvUpdated::TActions> actions; for (auto &xpair : msg->Touched) { @@ -611,7 +626,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { if (ownerIt != CollectionsOwners.end()) { for (auto* collection : ownerIt->second.Collections) { collection->Owners.erase(ev->Sender); - DropExpiredCollection(collection); + TryDropExpiredCollection(collection); } CollectionsOwners.erase(ownerIt); } @@ -654,6 +669,8 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } void Handle(NBlockIO::TEvData::TPtr &ev) { + ActualizeCacheSizeLimit(); + auto *msg = ev->Get(); if (TRequestQueue *queue = (TRequestQueue *)ev->Cookie) { @@ -694,7 +711,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { ProcessGCList(); } - void DropExpiredCollection(TCollection* collection) { + void TryDropExpiredCollection(TCollection* collection) { if (!collection->Owners && !collection->Expectants && collection->PageMap.used() == 0) @@ -720,9 +737,10 @@ class TSharedPageCache : public TActor<TSharedPageCache> { // We have successfully dropped the page // We are guaranteed no new uses for this page are possible Y_VERIFY(page->State == PageStateEvicted); - // It may be the case that collection has been dropped + RemovePassivePage(page); + + Y_VERIFY_DEBUG_S(page->Collection, "Evicted pages are expected to have collection"); if (auto* collection = page->Collection) { - RemovePassivePage(page); Y_VERIFY_DEBUG(collection->PageMap[page->PageId].Get() == page); Y_VERIFY(collection->PageMap.erase(page->PageId)); if (collection->Owners) { @@ -758,7 +776,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { collection->DroppedPages.clear(); } - DropExpiredCollection(collection); + TryDropExpiredCollection(collection); } for (auto& kv : toSend) { @@ -962,8 +980,12 @@ class TSharedPageCache : public TActor<TSharedPageCache> { if (msg->Record.GetMemoryLimit() != 0) { Config->CacheConfig->SetLimit(msg->Record.GetMemoryLimit()); - Cache.UpdateCacheSize(Config->CacheConfig->Limit); SizeOverride = Config->CacheConfig->Limit; + // limit will be updated with ActualizeCacheSizeLimit call + } + + if (msg->Record.HasActivePagesReservationPercent()) { + Config->ActivePagesReservationPercent = msg->Record.GetActivePagesReservationPercent(); } if (msg->Record.GetAsyncQueueInFlyLimit() != 0) { @@ -985,6 +1007,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } inline void AddPassivePage(const TPage* page) { + StatPassiveBytes += sizeof(TPage) + page->Size; if (Config->Counters) { ++*Config->Counters->PassivePages; *Config->Counters->PassiveBytes += sizeof(TPage) + page->Size; @@ -999,6 +1022,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } inline void RemovePassivePage(const TPage* page) { + StatPassiveBytes -= sizeof(TPage) + page->Size; if (Config->Counters) { --*Config->Counters->PassivePages; *Config->Counters->PassiveBytes -= sizeof(TPage) + page->Size; @@ -1006,11 +1030,11 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } public: - TSharedPageCache(TSharedPageCacheConfig *config) + TSharedPageCache(THolder<TSharedPageCacheConfig> config) : TActor(&TThis::StateFunc) - , Config(config) - , Cache(*config->CacheConfig) - , SizeOverride(config->CacheConfig->Limit, 1, Max<i64>()) + , Config(std::move(config)) + , Cache(*Config->CacheConfig) + , SizeOverride(Config->CacheConfig->Limit, 1, Max<i64>()) { AsyncRequests.Limit = Config->TotalAsyncQueueInFlyLimit; ScanRequests.Limit = Config->TotalScanQueueInFlyLimit; @@ -1037,8 +1061,8 @@ public: } // NTabletFlatExecutor -IActor* CreateSharedPageCache(TSharedPageCacheConfig *config) { - return new NTabletFlatExecutor::TSharedPageCache(config); +IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config) { + return new NTabletFlatExecutor::TSharedPageCache(std::move(config)); } } diff --git a/ydb/core/tablet_flat/shared_sausagecache.h b/ydb/core/tablet_flat/shared_sausagecache.h index 51d0682e38e..cf5e2f1c140 100644 --- a/ydb/core/tablet_flat/shared_sausagecache.h +++ b/ydb/core/tablet_flat/shared_sausagecache.h @@ -39,15 +39,16 @@ struct TSharedPageCacheCounters final : public TAtomicRefCount<TSharedPageCacheC explicit TSharedPageCacheCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters> &group); }; -struct TSharedPageCacheConfig final : public TAtomicRefCount<TSharedPageCacheConfig> { +struct TSharedPageCacheConfig { TIntrusivePtr<TCacheCacheConfig> CacheConfig; ui64 TotalScanQueueInFlyLimit = 512 * 1024 * 1024; ui64 TotalAsyncQueueInFlyLimit = 512 * 1024 * 1024; TString CacheName = "SharedPageCache"; TIntrusivePtr<TSharedPageCacheCounters> Counters; + ui32 ActivePagesReservationPercent = 50; }; -IActor* CreateSharedPageCache(TSharedPageCacheConfig *config); +IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config); inline TActorId MakeSharedPageCacheId(ui64 id = 0) { char x[12] = { 's', 'h', 's', 'c' }; diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h index 3b58ab814b4..847e7f80429 100644 --- a/ydb/core/tablet_flat/test/libs/exec/runner.h +++ b/ydb/core/tablet_flat/test/libs/exec/runner.h @@ -178,13 +178,13 @@ namespace NFake { } { /*_ Shared page collection cache service, used by executor */ - auto egg = MakeIntrusive<TSharedPageCacheConfig>(); + auto config = MakeHolder<TSharedPageCacheConfig>(); - egg->CacheConfig = new TCacheCacheConfig(conf.Shared, nullptr, nullptr, nullptr); - egg->TotalAsyncQueueInFlyLimit = conf.AsyncQueue; - egg->TotalScanQueueInFlyLimit = conf.ScanQueue; + config->CacheConfig = new TCacheCacheConfig(conf.Shared, nullptr, nullptr, nullptr); + config->TotalAsyncQueueInFlyLimit = conf.AsyncQueue; + config->TotalScanQueueInFlyLimit = conf.ScanQueue; - auto *actor = CreateSharedPageCache(egg.Get()); + auto *actor = CreateSharedPageCache(std::move(config)); RunOn(3, MakeSharedPageCacheId(0), actor, EMail::ReadAsFilled); } diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp index 7cbff912a73..130446df025 100644 --- a/ydb/core/testlib/basics/services.cpp +++ b/ydb/core/testlib/basics/services.cpp @@ -129,14 +129,14 @@ namespace NPDisk { void SetupSharedPageCache(TTestActorRuntime& runtime, ui32 nodeIndex, NFake::TCaches caches) { - auto pageCollectionCacheConfig = MakeIntrusive<TSharedPageCacheConfig>(); + auto pageCollectionCacheConfig = MakeHolder<TSharedPageCacheConfig>(); pageCollectionCacheConfig->CacheConfig = new TCacheCacheConfig(caches.Shared, nullptr, nullptr, nullptr); pageCollectionCacheConfig->TotalAsyncQueueInFlyLimit = caches.AsyncQueue; pageCollectionCacheConfig->TotalScanQueueInFlyLimit = caches.ScanQueue; runtime.AddLocalService(MakeSharedPageCacheId(0), TActorSetupCmd( - CreateSharedPageCache(pageCollectionCacheConfig.Get()), + CreateSharedPageCache(std::move(pageCollectionCacheConfig)), TMailboxType::ReadAsFilled, 0), nodeIndex); diff --git a/ydb/core/util/cache_cache.h b/ydb/core/util/cache_cache.h index 850b6b05f5e..939b438f196 100644 --- a/ydb/core/util/cache_cache.h +++ b/ydb/core/util/cache_cache.h @@ -80,7 +80,27 @@ public: , WarmWeight(0) {} - TItem* Touch(TItem *item) { // returns evicted elements as list + // returns evicted elements as list + TItem* EnsureLimits() { + TItem *ret = nullptr; + ret = LimitFresh(ret); + ret = LimitWarm(ret); + ret = LimitStaging(ret); + + if (ret) { + if (Config.ReportedFresh) + *Config.ReportedFresh = FreshWeight; + if (Config.ReportedWarm) + *Config.ReportedWarm = WarmWeight; + if (Config.ReportedStaging) + *Config.ReportedStaging = StagingWeight; + } + + return ret; + } + + // returns evicted elements as list + TItem* Touch(TItem *item) { TIntrusiveListItem<TItem> *xitem = item; const TCacheCacheConfig::ECacheGeneration cacheGen = GenerationOp.Get(item); @@ -176,14 +196,7 @@ private: } TItem* AddToFresh(TItem *item) { - TItem *ret = nullptr; - while (FreshWeight > Config.FreshLimit) { - Y_VERIFY_DEBUG(!FreshList.Empty()); - TItem *x = FreshList.PopBack(); - Y_VERIFY(GenerationOp.Get(x) == TCacheCacheConfig::CacheGenFresh, "malformed entry in fresh cache. %" PRIu32, (ui32)GenerationOp.Get(x)); - Unlink(x, FreshWeight); - ret = AddToStaging(x, ret); - } + TItem *ret = LimitFresh(nullptr); item->Unlink(); FreshWeight += WeightOp.Get(item); FreshList.PushFront(item); @@ -198,15 +211,8 @@ private: } TItem* MoveToWarm(TItem *item) { - TItem *ret = nullptr; + TItem *ret = LimitWarm(nullptr); Unlink(item, StagingWeight); - while (WarmWeight > Config.WarmLimit) { - Y_VERIFY_DEBUG(!WarmList.Empty()); - TItem *x = WarmList.PopBack(); - Y_VERIFY(GenerationOp.Get(x) == TCacheCacheConfig::CacheGenWarm, "malformed entry in warm cache. %" PRIu32, (ui32)GenerationOp.Get(x)); - Unlink(x, WarmWeight); - ret = AddToStaging(x, ret); - } WarmWeight += WeightOp.Get(item); WarmList.PushFront(item); GenerationOp.Set(item, TCacheCacheConfig::CacheGenWarm); @@ -220,6 +226,36 @@ private: } TItem* AddToStaging(TItem *item, TItem *ret) { + ret = LimitStaging(ret); + StagingWeight += WeightOp.Get(item); + StagingList.PushFront(item); + GenerationOp.Set(item, TCacheCacheConfig::CacheGenStaging); + return ret; + } + + TItem* LimitFresh(TItem *ret) { + while (FreshWeight > Config.FreshLimit) { + Y_VERIFY_DEBUG(!FreshList.Empty()); + TItem *x = FreshList.PopBack(); + Y_VERIFY(GenerationOp.Get(x) == TCacheCacheConfig::CacheGenFresh, "malformed entry in fresh cache. %" PRIu32, (ui32)GenerationOp.Get(x)); + Unlink(x, FreshWeight); + ret = AddToStaging(x, ret); + } + return ret; + } + + TItem* LimitWarm(TItem *ret) { + while (WarmWeight > Config.WarmLimit) { + Y_VERIFY_DEBUG(!WarmList.Empty()); + TItem *x = WarmList.PopBack(); + Y_VERIFY(GenerationOp.Get(x) == TCacheCacheConfig::CacheGenWarm, "malformed entry in warm cache. %" PRIu32, (ui32)GenerationOp.Get(x)); + Unlink(x, WarmWeight); + ret = AddToStaging(x, ret); + } + return ret; + } + + TItem* LimitStaging(TItem *ret) { while (StagingWeight > Config.StagingLimit) { Y_VERIFY_DEBUG(!StagingList.Empty()); TItem *evicted = StagingList.PopBack(); @@ -231,11 +267,9 @@ private: else evicted->LinkBefore(ret); } - StagingWeight += WeightOp.Get(item); - StagingList.PushFront(item); - GenerationOp.Set(item, TCacheCacheConfig::CacheGenStaging); return ret; } + private: TCacheCacheConfig Config; |