diff options
author | kungasc <kungasc@yandex-team.com> | 2023-07-13 11:25:17 +0300 |
---|---|---|
committer | kungasc <kungasc@yandex-team.com> | 2023-07-13 11:25:17 +0300 |
commit | 7f994596c9b79105e75062185e097f690c2342b4 (patch) | |
tree | 5d549e7f0b8881c8dac3200147424177ad76251c | |
parent | dfdcf3308ce1eef73cc215b3bf2730292628ff63 (diff) | |
download | ydb-7f994596c9b79105e75062185e097f690c2342b4.tar.gz |
KIKIMR-18537 Shrink shared cache according to free memory
26 files changed, 569 insertions, 130 deletions
diff --git a/ydb/core/base/memobserver.h b/ydb/core/base/memobserver.h new file mode 100644 index 0000000000..033fc6aa57 --- /dev/null +++ b/ydb/core/base/memobserver.h @@ -0,0 +1,51 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr { + +class TMemObserver : public TThrRefBase { +public: + struct TMemStat { + ui64 Used; + ui64 HardLimit; + ui64 SoftLimit; + }; + + using TCallback = std::function<void()>; + + void Subscribe(TCallback callback) { + auto guard = Guard(Mutex); + Callbacks.push_back(std::move(callback)); + } + + void SetStat(TMemStat stat) { + auto guard = Guard(Mutex); + Stat = stat; + } + + void NotifyStat(TMemStat stat) { + TVector<TCallback> copy; + { + auto guard = Guard(Mutex); + copy = Callbacks; + Stat = stat; + } + + for (const auto &c : copy) { + c(); + } + } + + TMemStat GetStat() { + auto guard = Guard(Mutex); + return Stat; + } + +private: + TVector<TCallback> Callbacks; + TMutex Mutex; + TMemStat Stat; +}; + +} diff --git a/ydb/core/base/ya.make b/ydb/core/base/ya.make index e6e2fe75a0..016b772782 100644 --- a/ydb/core/base/ya.make +++ b/ydb/core/base/ya.make @@ -34,6 +34,7 @@ SRCS( location.h logoblob.cpp logoblob.h + memobserver.h nameservice.h path.cpp pathid.cpp diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 5836e35379..8c158728d7 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1106,8 +1106,9 @@ void TLocalServiceInitializer::InitializeServices( // TSharedCacheInitializer -TSharedCacheInitializer::TSharedCacheInitializer(const TKikimrRunConfig& runConfig) +TSharedCacheInitializer::TSharedCacheInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver) : IKikimrServicesInitializer(runConfig) + , MemObserver(std::move(memObserver)) {} void TSharedCacheInitializer::InitializeServices( @@ -1136,7 +1137,7 @@ void TSharedCacheInitializer::InitializeServices( config->Counters = new TSharedPageCacheCounters(sausageGroup); setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(MakeSharedPageCacheId(0), - TActorSetupCmd(CreateSharedPageCache(std::move(config)), TMailboxType::ReadAsFilled, appData->UserPoolId))); + TActorSetupCmd(CreateSharedPageCache(std::move(config), MemObserver), TMailboxType::ReadAsFilled, appData->UserPoolId))); auto *configurator = NConsole::CreateSharedCacheConfigurator(); setup->LocalServices.emplace_back(TActorId(), @@ -2004,8 +2005,9 @@ void TPersQueueClusterTrackerInitializer::InitializeServices(NActors::TActorSyst // TMemProfMonitorInitializer -TMemProfMonitorInitializer::TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig) +TMemProfMonitorInitializer::TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver) : IKikimrServicesInitializer(runConfig) + , MemObserver(std::move(memObserver)) {} void TMemProfMonitorInitializer::InitializeServices( @@ -2019,6 +2021,7 @@ void TMemProfMonitorInitializer::InitializeServices( } IActor* monitorActor = CreateMemProfMonitor( + MemObserver, 1, // seconds appData->Counters, filePathPrefix); diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 3075e8dcb3..ad5aebe6dd 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -85,8 +85,10 @@ public: }; class TSharedCacheInitializer : public IKikimrServicesInitializer { + TIntrusivePtr<TMemObserver> MemObserver; + public: - TSharedCacheInitializer(const TKikimrRunConfig& runConfig); + TSharedCacheInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver); void InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) override; }; @@ -354,8 +356,10 @@ public: }; class TMemProfMonitorInitializer : public IKikimrServicesInitializer { + TIntrusivePtr<TMemObserver> MemObserver; + public: - TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig); + TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver); void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; }; diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 106a41dd3a..e2c31c8261 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -369,6 +369,7 @@ TKikimrRunner::TKikimrRunner(std::shared_ptr<TModuleFactories> factories) : ModuleFactories(std::move(factories)) , Counters(MakeIntrusive<::NMonitoring::TDynamicCounters>()) , PollerThreads(new NInterconnect::TPollerThreads) + , MemObserver(MakeIntrusive<TMemObserver>()) { } @@ -1397,7 +1398,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TLocalServiceInitializer(runConfig)); } if (serviceMask.EnableSharedCache) { - sil->AddServiceInitializer(new TSharedCacheInitializer(runConfig)); + sil->AddServiceInitializer(new TSharedCacheInitializer(runConfig, MemObserver)); } if (serviceMask.EnableBlobCache) { sil->AddServiceInitializer(new TBlobCacheInitializer(runConfig)); @@ -1494,7 +1495,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TNetClassifierInitializer(runConfig)); } - sil->AddServiceInitializer(new TMemProfMonitorInitializer(runConfig)); + sil->AddServiceInitializer(new TMemProfMonitorInitializer(runConfig, MemObserver)); #if defined(ENABLE_MEMORY_TRACKING) sil->AddServiceInitializer(new TMemoryTrackerInitializer(runConfig)); diff --git a/ydb/core/driver_lib/run/run.h b/ydb/core/driver_lib/run/run.h index 89dda5dde7..8c6becf2a3 100644 --- a/ydb/core/driver_lib/run/run.h +++ b/ydb/core/driver_lib/run/run.h @@ -16,6 +16,7 @@ #include <ydb/core/client/server/grpc_server.h> #include <ydb/core/fq/libs/shared_resources/interface/shared_resources.h> #include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/base/memobserver.h> #include <ydb/core/tablet/node_tablet_monitor.h> #include <ydb/core/tablet/tablet_setup.h> #include <ydb/core/ymq/http/http.h> @@ -65,6 +66,8 @@ protected: std::shared_ptr<TLogBackend> LogBackend; TAutoPtr<TActorSystem> ActorSystem; + TIntrusivePtr<TMemObserver> MemObserver; + TKikimrRunner(std::shared_ptr<TModuleFactories> factories = {}); virtual ~TKikimrRunner(); diff --git a/ydb/core/mon_alloc/monitor.cpp b/ydb/core/mon_alloc/monitor.cpp index d6b955f380..7554f671aa 100644 --- a/ydb/core/mon_alloc/monitor.cpp +++ b/ydb/core/mon_alloc/monitor.cpp @@ -400,8 +400,11 @@ namespace NKikimr { struct TDumpLogConfig { static constexpr double RssUsageHard = 0.9; static constexpr double RssUsageSoft = 0.85; + static constexpr double RssUsageSoftLimit = 0.8; + static constexpr double RssUsageNotifySlowLimit = 0.7; static constexpr TDuration RepeatInterval = TDuration::Seconds(10); static constexpr TDuration DumpInterval = TDuration::Minutes(10); + static constexpr TDuration NotifySlowInterval = TDuration::Seconds(10); }; enum { @@ -411,11 +414,13 @@ namespace NKikimr { struct TEvDumpLogStats : public TEventLocal<TEvDumpLogStats, EvDumpLogStats> {}; + const TIntrusivePtr<TMemObserver> MemObserver; const TDuration Interval; const std::unique_ptr<IAllocMonitor> AllocMonitor; const TString FilePathPrefix; TInstant LogMemoryStatsTime = TInstant::Now() - TDumpLogConfig::DumpInterval; + TInstant NotifyMemoryStatsTime = TInstant::Now() - TDumpLogConfig::NotifySlowInterval; bool IsDangerous = false; @@ -424,8 +429,9 @@ namespace NKikimr { return EActivityType::ACTORLIB_STATS; } - TMemProfMonitor(TDuration interval, std::unique_ptr<IAllocMonitor> allocMonitor, const TString& filePathPrefix) - : Interval(interval) + TMemProfMonitor(TIntrusivePtr<TMemObserver> memObserver, TDuration interval, std::unique_ptr<IAllocMonitor> allocMonitor, const TString& filePathPrefix) + : MemObserver(std::move(memObserver)) + , Interval(interval) , AllocMonitor(std::move(allocMonitor)) , FilePathPrefix(filePathPrefix) {} @@ -487,8 +493,7 @@ namespace NKikimr { } } - void LogMemoryStatsIfNeeded(const TActorContext& ctx) noexcept { - auto memoryUsage = TAllocState::GetMemoryUsage(); + void LogMemoryStatsIfNeeded(const TActorContext& ctx, TMemoryUsage memoryUsage) noexcept { auto usage = memoryUsage.Usage(); LOG_DEBUG_S(ctx, NKikimrServices::MEMORY_PROFILER, memoryUsage.ToString()); if (IsDangerous && usage < TDumpLogConfig::RssUsageSoft) { @@ -504,7 +509,23 @@ namespace NKikimr { void HandleWakeup(const TActorContext& ctx) noexcept { AllocMonitor->Update(Interval); - LogMemoryStatsIfNeeded(ctx); + auto memoryUsage = TAllocState::GetMemoryUsage(); + LogMemoryStatsIfNeeded(ctx, memoryUsage); + + TMemObserver::TMemStat stat{ + memoryUsage.AnonRss, + memoryUsage.CGroupLimit, + static_cast<ui64>(memoryUsage.CGroupLimit * TDumpLogConfig::RssUsageSoftLimit)}; + + if (memoryUsage.AnonRss > TDumpLogConfig::RssUsageNotifySlowLimit || + TInstant::Now() - NotifyMemoryStatsTime > TDumpLogConfig::NotifySlowInterval) { + NotifyMemoryStatsTime = TInstant::Now(); + MemObserver->NotifyStat(stat); + } else { + // fast path: don't call callback, but update current stat + MemObserver->SetStat(stat); + } + ctx.Schedule(Interval, new TEvents::TEvWakeup()); } @@ -531,8 +552,9 @@ namespace NKikimr { }; } - IActor* CreateMemProfMonitor(ui32 intervalSec, TDynamicCountersPtr counters, const TString& filePathPrefix) { + IActor* CreateMemProfMonitor(TIntrusivePtr<TMemObserver> memObserver, ui32 intervalSec, TDynamicCountersPtr counters, const TString& filePathPrefix) { return new TMemProfMonitor( + memObserver, TDuration::Seconds(intervalSec), CreateAllocMonitor(GetServiceCounters(counters, "utils")), filePathPrefix); diff --git a/ydb/core/mon_alloc/monitor.h b/ydb/core/mon_alloc/monitor.h index 52ade6297a..00e4421f8b 100644 --- a/ydb/core/mon_alloc/monitor.h +++ b/ydb/core/mon_alloc/monitor.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/core/base/memobserver.h> #include <ydb/core/control/immediate_control_board_impl.h> #include <library/cpp/actors/core/defs.h> @@ -45,6 +46,7 @@ namespace NKikimr { }; NActors::IActor* CreateMemProfMonitor( + TIntrusivePtr<TMemObserver> memObserver, ui32 intervalSec, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TString& filePathPrefix = ""); diff --git a/ydb/core/tablet_flat/flat_bio_actor.h b/ydb/core/tablet_flat/flat_bio_actor.h index caf3025c2e..faed351711 100644 --- a/ydb/core/tablet_flat/flat_bio_actor.h +++ b/ydb/core/tablet_flat/flat_bio_actor.h @@ -60,7 +60,7 @@ namespace NBlockIO { }; template<typename ... TArgs> - inline void Once(NActors::IActorOps *ops, TActorId service, + inline void Start(NActors::IActorOps *ops, TActorId service, ui64 cookie, TArgs&& ... args) noexcept { auto self = ops->Register(new TBlockIO(service, cookie)); diff --git a/ydb/core/tablet_flat/flat_sausagecache.cpp b/ydb/core/tablet_flat/flat_sausagecache.cpp index fc3a6cbdc3..327807d3fe 100644 --- a/ydb/core/tablet_flat/flat_sausagecache.cpp +++ b/ydb/core/tablet_flat/flat_sausagecache.cpp @@ -4,12 +4,12 @@ namespace NKikimr { namespace NTabletFlatExecutor { -TPrivatePageCache::TPage::TPage(ui32 size, ui32 pageId, TInfo* info) +TPrivatePageCache::TPage::TPage(size_t size, ui32 pageId, TInfo* info) : LoadState(LoadStateNo) , Sticky(false) , SharedPending(false) - , Size(size) , Id(pageId) + , Size(size) , Info(info) {} @@ -462,7 +462,8 @@ void TPrivatePageCache::UpdateSharedBody(TInfo *info, ui32 pageId, TSharedPageRe if (!page) return; - Y_VERIFY_DEBUG(page->SharedPending, "Shared cache accepted a page that is not pending, possible bug"); + // Note: shared cache may accept a peinding page if it is used by multiple private caches + // (for expample, used by tablet and its follower) if (Y_UNLIKELY(!page->SharedPending)) { return; } @@ -486,7 +487,8 @@ void TPrivatePageCache::DropSharedBody(TInfo *info, ui32 pageId) { if (!page) return; - Y_VERIFY_DEBUG(!page->SharedPending, "Shared cache dropped page sharing request, possible bug"); + // Note: shared cache may drop a peinding page if it is used by multiple private caches + // (for expample, used by tablet and its follower) if (Y_UNLIKELY(page->SharedPending)) { // Shared cache rejected our page so we should drop it too Stats.TotalSharedPending -= page->Size; diff --git a/ydb/core/tablet_flat/flat_sausagecache.h b/ydb/core/tablet_flat/flat_sausagecache.h index 3a1b830e2e..bec3c919e2 100644 --- a/ydb/core/tablet_flat/flat_sausagecache.h +++ b/ydb/core/tablet_flat/flat_sausagecache.h @@ -48,12 +48,12 @@ public: LoadStateRequestedAsync, }; - ui64 LoadState : 2; - ui64 Sticky : 1; - ui64 SharedPending : 1; - ui64 : 4; // padding - const ui64 Size : 24; - const ui64 Id : 32; + ui32 LoadState : 2; + ui32 Sticky : 1; + ui32 SharedPending : 1; + + const ui32 Id; + const size_t Size; TInfo* const Info; TIntrusivePtr<TPrivatePageCachePinPad> PinPad; @@ -61,7 +61,7 @@ public: TSharedPageRef SharedBody; TSharedData PinnedBody; - TPage(ui32 size, ui32 pageId, TInfo* info); + TPage(size_t size, ui32 pageId, TInfo* info); TPage(const TPage&) = delete; TPage(TPage&&) = delete; diff --git a/ydb/core/tablet_flat/shared_cache_events.h b/ydb/core/tablet_flat/shared_cache_events.h index e7ed55be96..a7abe542ef 100644 --- a/ydb/core/tablet_flat/shared_cache_events.h +++ b/ydb/core/tablet_flat/shared_cache_events.h @@ -26,6 +26,7 @@ namespace NSharedCache { EvRequest, EvResult, EvUpdated, + EvMem, EvEnd @@ -130,6 +131,9 @@ namespace NSharedCache { THashMap<TLogoBlobID, TActions> Actions; }; + struct TEvMem : public TEventLocal<TEvMem, EvMem> { + }; + } } diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp index c0cb5e7987..40d86df5fd 100644 --- a/ydb/core/tablet_flat/shared_sausagecache.cpp +++ b/ydb/core/tablet_flat/shared_sausagecache.cpp @@ -13,8 +13,10 @@ namespace NKikimr { TSharedPageCacheCounters::TSharedPageCacheCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters> &group) - : ActivePages(group->GetCounter("ActivePages")) + : MemLimitBytes(group->GetCounter("MemLimitBytes")) + , ActivePages(group->GetCounter("ActivePages")) , ActiveBytes(group->GetCounter("ActiveBytes")) + , ActiveLimitBytes(group->GetCounter("ActiveLimitBytes")) , PassivePages(group->GetCounter("PassivePages")) , PassiveBytes(group->GetCounter("PassiveBytes")) , RequestedPages(group->GetCounter("RequestedPages", true)) @@ -39,9 +41,7 @@ static bool Satisfies(NLog::EPriority priority = NLog::PRI_DEBUG) { return false; } -class TSharedPageCache : public TActor<TSharedPageCache> { - THolder<TSharedPageCacheConfig> Config; - +class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { using ELnLev = NUtil::ELnLev; using TBlocks = TVector<NSharedCache::TEvResult::TLoaded>; @@ -60,20 +60,21 @@ class TSharedPageCache : public TActor<TSharedPageCache> { : public TSharedPageHandle , public TIntrusiveListItem<TPage> { - ui64 State : 4; - ui64 CacheGeneration : 3; - ui64 InMemory : 1; - ui64 Size : 24; - const ui64 PageId : 32; + ui32 State : 4; + ui32 CacheGeneration : 3; + ui32 InMemory : 1; + + const ui32 PageId; + const size_t Size; TCollection* Collection; - TPage(ui32 pageId, TCollection* collection) + TPage(ui32 pageId, size_t size, TCollection* collection) : State(PageStateNo) , CacheGeneration(TCacheCacheConfig::CacheGenNone) , InMemory(false) - , Size(0) , PageId(pageId) + , Size(size) , Collection(collection) {} @@ -163,11 +164,13 @@ class TSharedPageCache : public TActor<TSharedPageCache> { TAutoPtr<NUtil::ILogger> Logger; THashMap<TLogoBlobID, TCollection> Collections; THashMap<TActorId, TCollectionsOwner> CollectionsOwners; + TIntrusivePtr<TMemObserver> MemObserver; TRequestQueue AsyncRequests; TRequestQueue ScanRequests; - TCacheCache<TPage, TPage::TWeight> Cache; + THolder<TSharedPageCacheConfig> Config; + TCacheCache<TPage, TPage::TWeight, TCacheCacheConfig::TDefaultGeneration<TPage>> Cache; TControlWrapper SizeOverride; @@ -176,27 +179,90 @@ class TSharedPageCache : public TActor<TSharedPageCache> { ui64 StatHitBytes = 0; ui64 StatMissPages = 0; ui64 StatMissBytes = 0; + ui64 StatActiveBytes = 0; ui64 StatPassiveBytes = 0; + ui64 StatLoadInFlyBytes = 0; bool GCScheduled = false; + // 0 means unlimited + ui64 MemLimitBytes = 0; + ui64 ConfigLimitBytes; + void ActualizeCacheSizeLimit() { if ((ui64)SizeOverride != Config->CacheConfig->Limit) { Config->CacheConfig->SetLimit(SizeOverride); } - ui64 limit = Config->CacheConfig->Limit; - limit = Max<ui64>(1, - limit * Config->ActivePagesReservationPercent / 100, - limit > StatPassiveBytes ? limit - StatPassiveBytes : 0); + ConfigLimitBytes = Config->CacheConfig->Limit; + ui64 limit = ConfigLimitBytes; + if (MemLimitBytes && ConfigLimitBytes > MemLimitBytes) { + limit = MemLimitBytes; + } + + // limit of cache depends only on config and mem because passive pages may go in and out arbitrary + // we may have some passive bytes, so if we fully fill this Cache we may exceed the limit + // because of that DoGC should be called to ensure limits Cache.UpdateCacheSize(limit); - Evict(Cache.EnsureLimits()); + if (Config->Counters) { + Config->Counters->ActiveLimitBytes->Set(limit); + } + } + + void DoGC() { + // maybe we already have enough useless pages + // update StatActiveBytes + StatPassiveBytes + ProcessGCList(); + + ui64 configActiveReservedBytes = ConfigLimitBytes * Config->ActivePagesReservationPercent / 100; + + THashSet<TCollection*> recheck; + while (MemLimitBytes && GetStatAllBytes() > MemLimitBytes + || GetStatAllBytes() > ConfigLimitBytes && StatActiveBytes > configActiveReservedBytes) { + auto page = Cache.EvictNext(); + if (!page) { + break; + } + EvictNow(page, recheck); + } + if (recheck) { + CheckExpiredCollections(std::move(recheck)); + } + } + + void Handle(NSharedCache::TEvMem::TPtr &) { + // always get the latest value + auto mem = MemObserver->GetStat(); + + if (mem.SoftLimit) { + ui64 usedExternal = 0; + // we have: mem.Used = usedExternal + StatAllBytes + if (mem.Used > GetStatAllBytes()) { + usedExternal = mem.Used - GetStatAllBytes(); + } + + // we want: MemLimitBytes + externalUsage <= mem.SoftLimit + MemLimitBytes = mem.SoftLimit > usedExternal + ? mem.SoftLimit - usedExternal + : 1; + } else { + MemLimitBytes = 0; + } + + if (Config->Counters) { + Config->Counters->MemLimitBytes->Set(MemLimitBytes); + } + + ActualizeCacheSizeLimit(); + + DoGC(); } void Registered(TActorSystem *sys, const TActorId &owner) { + NActors::TActorBootstrapped<TSharedPageCache>::Registered(sys, owner); Owner = owner; Logger = new NUtil::TLogger(sys, NKikimrServices::TABLET_SAUSAGECACHE); @@ -286,8 +352,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { metaId.ToString().c_str(), pageId, collection.PageMap.size()); auto* page = collection.PageMap[pageId].Get(); if (!page) { - Y_VERIFY(collection.PageMap.emplace(pageId, (page = new TPage(pageId, &collection)))); - page->Size = pageCollection.Page(pageId).Size; + Y_VERIFY(collection.PageMap.emplace(pageId, (page = new TPage(pageId, pageCollection.Page(pageId).Size, &collection)))); } if (Config->Counters) { @@ -415,12 +480,9 @@ class TSharedPageCache : public TActor<TSharedPageCache> { if (queue) { RequestFromQueue(*queue); } else { - if (Config->Counters) { - *Config->Counters->LoadInFlyPages += pagesToRequest.size(); - *Config->Counters->LoadInFlyBytes += pagesToRequestBytes; - } + AddInFlyPages(pagesToRequest.size(), pagesToRequestBytes); auto *fetch = new NPageCollection::TFetch(0, waitingRequest->PageCollection, std::move(pagesToRequest)); - NBlockIO::Once(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch); + NBlockIO::Start(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch); } } } else { @@ -432,7 +494,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { SendReadyBlocks(*waitingRequest); } - ProcessGCList(); + DoGC(); } void RequestFromQueue(TRequestQueue &queue) { @@ -500,10 +562,11 @@ class TSharedPageCache : public TActor<TSharedPageCache> { << " pages " << toLoad; } + AddInFlyPages(toLoad.size(), sizeToLoad); // fetch cookie -> requested size; // event cookie -> ptr to queue auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad)); - NBlockIO::Once(this, wa.Owner, (ui64)&queue, wa.Priority, fetch); + NBlockIO::Start(this, wa.Owner, (ui64)&queue, wa.Priority, fetch); } } } @@ -565,8 +628,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { auto* page = collection.PageMap[pageId].Get(); if (!page) { if (x.second) { - Y_VERIFY(collection.PageMap.emplace(pageId, (page = new TPage(pageId, &collection)))); - page->Size = x.second.size(); + Y_VERIFY(collection.PageMap.emplace(pageId, (page = new TPage(pageId, x.second.size(), &collection)))); } else { continue; } @@ -612,7 +674,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { Send(ev->Sender, msg.Release()); } - ProcessGCList(); + DoGC(); } void Handle(NSharedCache::TEvUnregister::TPtr &ev) { @@ -697,9 +759,8 @@ class TSharedPageCache : public TActor<TSharedPageCache> { if (!page || !page->HasMissingBody()) continue; - if (page->State == PageStateRequested && Config->Counters) { - --*Config->Counters->LoadInFlyPages; - *Config->Counters->LoadInFlyBytes -= page->Size; + if (page->State == PageStateRequested || page->State == PageStateRequestedAsync) { + RemoveInFlyPage(page); } page->Initialize(std::move(paged.Data)); @@ -708,7 +769,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } } - ProcessGCList(); + DoGC(); } void TryDropExpiredCollection(TCollection* collection) { @@ -733,28 +794,38 @@ class TSharedPageCache : public TActor<TSharedPageCache> { while (auto rawPage = GCList->PopGC()) { auto* page = static_cast<TPage*>(rawPage.Get()); - if (page->TryDrop()) { - // We have successfully dropped the page - // We are guaranteed no new uses for this page are possible - Y_VERIFY(page->State == PageStateEvicted); - RemovePassivePage(page); - - Y_VERIFY_DEBUG_S(page->Collection, "Evicted pages are expected to have collection"); - if (auto* collection = page->Collection) { - Y_VERIFY_DEBUG(collection->PageMap[page->PageId].Get() == page); - Y_VERIFY(collection->PageMap.erase(page->PageId)); - if (collection->Owners) { - collection->DroppedPages.push_back(page->PageId); - } - recheck.insert(collection); - } - } + TryDrop(page, recheck); } if (recheck) { CheckExpiredCollections(std::move(recheck)); } + TryScheduleGC(); + } + + void TryDrop(TPage* page, THashSet<TCollection*>& recheck) { + if (page->TryDrop()) { + // We have successfully dropped the page + // We are guaranteed no new uses for this page are possible + Y_VERIFY(page->State == PageStateEvicted); + RemovePassivePage(page); + + Y_VERIFY_DEBUG_S(page->Collection, "Evicted pages are expected to have collection"); + if (auto* collection = page->Collection) { + auto pageId = page->PageId; + Y_VERIFY_DEBUG(collection->PageMap[pageId].Get() == page); + Y_VERIFY(collection->PageMap.erase(pageId)); + // Note: don't use page after erase as it may be deleted + if (collection->Owners) { + collection->DroppedPages.push_back(pageId); + } + recheck.insert(collection); + } + } + } + + void TryScheduleGC() { if (!GCScheduled) { TActivationContext::AsActorContext().Schedule(TDuration::Seconds(15), new TEvents::TEvWakeup()); GCScheduled = true; @@ -858,11 +929,10 @@ class TSharedPageCache : public TActor<TSharedPageCache> { continue; } - if (page->State == PageStateRequested && Config->Counters) { + if (page->State == PageStateRequested) { // Request is technically inflight, but response will be ignored // Pretend request is cancelled for simplicity - --*Config->Counters->LoadInFlyPages; - *Config->Counters->LoadInFlyBytes -= page->Size; + RemoveInFlyPage(page); } page->Collection = nullptr; @@ -933,36 +1003,35 @@ class TSharedPageCache : public TActor<TSharedPageCache> { void Evict(TIntrusiveList<TPage>&& pages) { while (!pages.Empty()) { TPage* page = pages.PopFront(); - Y_VERIFY(page->CacheGeneration == TCacheCacheConfig::CacheGenEvicted); + + Y_VERIFY_S(page->CacheGeneration == TCacheCacheConfig::CacheGenEvicted, "unexpected " << page->CacheGeneration << " page cache generation"); page->CacheGeneration = TCacheCacheConfig::CacheGenNone; - switch (page->State) { - case PageStateNo: - Y_FAIL("unexpected uninitialized page"); - - case PageStateEvicted: - Y_FAIL("unexpected evicted page"); - - case PageStateRequested: - case PageStateRequestedAsync: - case PageStatePending: - break; - - case PageStateLoaded: - page->State = PageStateEvicted; - RemoveActivePage(page); - AddPassivePage(page); - if (page->UnUse()) { - GCList->PushGC(page); - } - break; - - default: - Y_FAIL("unknown load state"); + Y_VERIFY_S(page->State == PageStateLoaded, "unexpected " << page->State << " page state"); + page->State = PageStateEvicted; + + RemoveActivePage(page); + AddPassivePage(page); + if (page->UnUse()) { + GCList->PushGC(page); } } } + void EvictNow(TPage* page, THashSet<TCollection*>& recheck) { + Y_VERIFY_S(page->CacheGeneration == TCacheCacheConfig::CacheGenEvicted, "unexpected " << page->CacheGeneration << " page cache generation"); + page->CacheGeneration = TCacheCacheConfig::CacheGenNone; + + Y_VERIFY_S(page->State == PageStateLoaded, "unexpected " << page->State << " page state"); + page->State = PageStateEvicted; + + RemoveActivePage(page); + AddPassivePage(page); + if (page->UnUse()) { + TryDrop(page, recheck); + } + } + void Handle(TEvSharedPageCache::TEvConfigure::TPtr& ev) { const auto* msg = ev->Get(); @@ -987,7 +1056,12 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } } + inline ui64 GetStatAllBytes() const { + return StatActiveBytes + StatPassiveBytes + StatLoadInFlyBytes; + } + inline void AddActivePage(const TPage* page) { + StatActiveBytes += sizeof(TPage) + page->Size; if (Config->Counters) { ++*Config->Counters->ActivePages; *Config->Counters->ActiveBytes += sizeof(TPage) + page->Size; @@ -1002,7 +1076,17 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } } + inline void AddInFlyPages(ui64 count, ui64 size) { + StatLoadInFlyBytes += sizeof(TPage) * count + size; + if (Config->Counters) { + *Config->Counters->LoadInFlyPages += count; + *Config->Counters->LoadInFlyBytes += sizeof(TPage) * count + size; + } + } + inline void RemoveActivePage(const TPage* page) { + Y_VERIFY_DEBUG(StatActiveBytes >= sizeof(TPage) + page->Size); + StatActiveBytes -= sizeof(TPage) + page->Size; if (Config->Counters) { --*Config->Counters->ActivePages; *Config->Counters->ActiveBytes -= sizeof(TPage) + page->Size; @@ -1010,6 +1094,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } inline void RemovePassivePage(const TPage* page) { + Y_VERIFY_DEBUG(StatPassiveBytes >= sizeof(TPage) + page->Size); StatPassiveBytes -= sizeof(TPage) + page->Size; if (Config->Counters) { --*Config->Counters->PassivePages; @@ -1017,17 +1102,35 @@ class TSharedPageCache : public TActor<TSharedPageCache> { } } + inline void RemoveInFlyPage(const TPage* page) { + Y_VERIFY_DEBUG(StatLoadInFlyBytes >= sizeof(TPage) + page->Size); + StatLoadInFlyBytes -= sizeof(TPage) + page->Size; + if (Config->Counters) { + --*Config->Counters->LoadInFlyPages; + *Config->Counters->LoadInFlyBytes -= sizeof(TPage) + page->Size; + } + } + public: - TSharedPageCache(THolder<TSharedPageCacheConfig> config) - : TActor(&TThis::StateFunc) + TSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver) + : MemObserver(std::move(memObserver)) , Config(std::move(config)) , Cache(*Config->CacheConfig) , SizeOverride(Config->CacheConfig->Limit, 1, Max<i64>()) + , ConfigLimitBytes(Config->CacheConfig->Limit) { AsyncRequests.Limit = Config->TotalAsyncQueueInFlyLimit; ScanRequests.Limit = Config->TotalScanQueueInFlyLimit; } + void Bootstrap() { + MemObserver->Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()] () { + actorSystem->Send(selfId, new NSharedCache::TEvMem()); + }); + + Become(&TThis::StateFunc); + } + STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { hFunc(NSharedCache::TEvAttach, Handle); @@ -1035,6 +1138,7 @@ public: hFunc(NSharedCache::TEvTouch, Handle); hFunc(NSharedCache::TEvUnregister, Handle); hFunc(NSharedCache::TEvInvalidate, Handle); + hFunc(NSharedCache::TEvMem, Handle); hFunc(NBlockIO::TEvData, Handle); hFunc(TEvSharedPageCache::TEvConfigure, Handle); cFunc(TEvents::TSystem::Wakeup, GCWakeup); @@ -1049,8 +1153,8 @@ public: } // NTabletFlatExecutor -IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config) { - return new NTabletFlatExecutor::TSharedPageCache(std::move(config)); +IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver) { + return new NTabletFlatExecutor::TSharedPageCache(std::move(config), std::move(memObserver)); } } diff --git a/ydb/core/tablet_flat/shared_sausagecache.h b/ydb/core/tablet_flat/shared_sausagecache.h index cf5e2f1c14..28ae0825b4 100644 --- a/ydb/core/tablet_flat/shared_sausagecache.h +++ b/ydb/core/tablet_flat/shared_sausagecache.h @@ -1,5 +1,6 @@ #pragma once #include "defs.h" +#include <ydb/core/base/memobserver.h> #include <ydb/core/protos/shared_cache.pb.h> #include <ydb/core/util/cache_cache.h> #include <util/system/unaligned_mem.h> @@ -23,8 +24,10 @@ struct TEvSharedPageCache { struct TSharedPageCacheCounters final : public TAtomicRefCount<TSharedPageCacheCounters> { using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr; + const TCounterPtr MemLimitBytes; const TCounterPtr ActivePages; const TCounterPtr ActiveBytes; + const TCounterPtr ActiveLimitBytes; const TCounterPtr PassivePages; const TCounterPtr PassiveBytes; const TCounterPtr RequestedPages; @@ -48,7 +51,7 @@ struct TSharedPageCacheConfig { ui32 ActivePagesReservationPercent = 50; }; -IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config); +IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver); inline TActorId MakeSharedPageCacheId(ui64 id = 0) { char x[12] = { 's', 'h', 's', 'c' }; diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h index 5196950aa3..5361c1f0c2 100644 --- a/ydb/core/tablet_flat/test/libs/exec/runner.h +++ b/ydb/core/tablet_flat/test/libs/exec/runner.h @@ -163,7 +163,7 @@ namespace NFake { { /*_ Resource broker service, used for generic scans */ using namespace NResourceBroker; - auto *actor = CreateResourceBrokerActor(MakeDefaultConfig(), Env.GetDynamicCounters(0)); + auto *actor = CreateResourceBrokerActor(MakeDefaultConfig(), Env.GetDynamicCounters()); AddService(MakeResourceBrokerID(), actor, EMail::Revolving); } @@ -183,8 +183,9 @@ namespace NFake { config->CacheConfig = new TCacheCacheConfig(conf.Shared, nullptr, nullptr, nullptr); config->TotalAsyncQueueInFlyLimit = conf.AsyncQueue; config->TotalScanQueueInFlyLimit = conf.ScanQueue; + config->Counters = MakeIntrusive<TSharedPageCacheCounters>(Env.GetDynamicCounters()); - auto *actor = CreateSharedPageCache(std::move(config)); + auto *actor = CreateSharedPageCache(std::move(config), Env.GetMemObserver()); RunOn(3, MakeSharedPageCacheId(0), actor, EMail::ReadAsFilled); } diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt index e76cc7340f..f5e5f0917d 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt @@ -71,6 +71,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_forward.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt index 232fb00f89..4723dfaccd 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt @@ -74,6 +74,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_forward.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt index 4f9b399a40..233cb18aa6 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt @@ -75,6 +75,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_forward.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt index 716aa62ec4..b8a2359330 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt @@ -64,6 +64,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_forward.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp diff --git a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp new file mode 100644 index 0000000000..26bdf4f71e --- /dev/null +++ b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp @@ -0,0 +1,166 @@ +#include "flat_executor_ut_common.h" + +#include <util/system/sanitizers.h> + +namespace NKikimr { +namespace NTabletFlatExecutor { + +Y_UNIT_TEST_SUITE(TSharedPageCache) { + +constexpr i64 MB = 1024*1024; + +enum : ui32 { + TableId = 101, + KeyColumnId = 1, + ValueColumnId = 2, +}; + +struct TTxInitSchema : public ITransaction { + bool Execute(TTransactionContext& txc, const TActorContext&) override { + if (txc.DB.GetScheme().GetTableInfo(TableId)) + return true; + + txc.DB.Alter() + .AddTable("test" + ToString(ui32(TableId)), TableId) + .AddColumn(TableId, "key", KeyColumnId, NScheme::TInt64::TypeId, false) + .AddColumn(TableId, "value", ValueColumnId, NScheme::TString::TypeId, false) + .AddColumnToKey(TableId, KeyColumnId); + + return true; + } + + void Complete(const TActorContext& ctx) override { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + } +}; + +struct TTxWriteRow : public ITransaction { + i64 Key; + TString Value; + + explicit TTxWriteRow(i64 key, TString value) + : Key(key) + , Value(std::move(value)) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + const auto key = NScheme::TInt64::TInstance(Key); + + const auto val = NScheme::TString::TInstance(Value); + NTable::TUpdateOp ops{ ValueColumnId, NTable::ECellOp::Set, val }; + + txc.DB.Update(TableId, NTable::ERowOp::Upsert, { key }, { ops }); + + return true; + } + + void Complete(const TActorContext&ctx) override { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + } +}; + +struct TTxReadRow : public ITransaction { + i64 Key; + + explicit TTxReadRow(i64 key) + : Key(key) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + TVector<TRawTypeValue> rawKey; + rawKey.emplace_back(&Key, sizeof(Key), NScheme::TInt64::TypeId); + + TVector<NTable::TTag> tags; + tags.push_back(KeyColumnId); + tags.push_back(ValueColumnId); + + NTable::TRowState row; + auto ready = txc.DB.Select(TableId, rawKey, tags, row); + if (ready == NTable::EReady::Page) { + return false; + } + + return true; + } + + void Complete(const TActorContext&ctx) override { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + } +}; + +void LogCounters(TIntrusivePtr<TSharedPageCacheCounters> counters) { + Cerr << "Counters: Active:" << counters->ActiveBytes->Val() << "/" << counters->ActiveLimitBytes->Val() << ", Passive:" << counters->PassiveBytes->Val() << ", MemLimit:" << counters->MemLimitBytes->Val() << Endl; +} + +void WaitMemEvent(TMyEnvBase& env) { + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(NSharedCache::EvMem, 1)); + env->DispatchEvents(options); +} + +Y_UNIT_TEST(Limits) { + TMyEnvBase env; + auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters()); + + env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); + env.SendSync(new NFake::TEvExecute{ new TTxInitSchema() }); + + // write 300 rows, each ~100KB + for (i64 key = 0; key < 300; ++key) { + TString value(size_t(100 * 1024), char('a' + key % 26)); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) }); + } + + Cerr << "...compacting" << Endl; + env.SendSync(new NFake::TEvCompact(TableId)); + Cerr << "...waiting until compacted" << Endl; + env.WaitFor<NFake::TEvCompacted>(); + + for (i64 key = 0; key < 100; ++key) { + env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key) }); + } + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(counters->LoadInFlyBytes->Val(), 0); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB / 2); // 2 full layers (fresh & staging) + UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB); + UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); + UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 0); + + env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB}); + WaitMemEvent(env); + LogCounters(counters); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB / 2); + UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB); + UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); + UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 5*MB + counters->ActiveBytes->Val() + counters->PassiveBytes->Val()); + + + env->GetMemObserver()->NotifyStat({101*MB, 110*MB, 100*MB}); + WaitMemEvent(env); + LogCounters(counters); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2); // 1mb evicted + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2); + UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); + UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), counters->ActiveLimitBytes->Val()); + + env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB}); + WaitMemEvent(env); + LogCounters(counters); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2); + UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB); + UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); + UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 5*MB + counters->ActiveBytes->Val() + counters->PassiveBytes->Val()); // +5mb + + env->GetMemObserver()->NotifyStat({200*MB, 110*MB, 100*MB}); + WaitMemEvent(env); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(counters->ActiveBytes->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 1); // zero limit + UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); + UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 1); // zero limit +} + +} // Y_UNIT_TEST_SUITE(TSharedPageCache) + +} // namespace NTabletFlatExecutor +} // namespace NKikimr diff --git a/ydb/core/tablet_flat/ut/ut_slice_loader.cpp b/ydb/core/tablet_flat/ut/ut_slice_loader.cpp index a2516d8947..df307fb627 100644 --- a/ydb/core/tablet_flat/ut/ut_slice_loader.cpp +++ b/ydb/core/tablet_flat/ut/ut_slice_loader.cpp @@ -74,7 +74,7 @@ namespace { { const auto array = Part->Store->PageCollectionArray(Room); - return { ui32(array.at(page).size()), ui32(EPage::Undef) }; + return { array.at(page).size(), ui32(EPage::Undef) }; } NPageCollection::TBorder Bounds(ui32) const noexcept override diff --git a/ydb/core/tablet_flat/ut/ya.make b/ydb/core/tablet_flat/ut/ya.make index eb1528e860..aeae789e3a 100644 --- a/ydb/core/tablet_flat/ut/ya.make +++ b/ydb/core/tablet_flat/ut/ya.make @@ -47,6 +47,7 @@ SRCS( ut_forward.cpp ut_screen.cpp ut_bloom.cpp + ut_shared_sausagecache.cpp ut_slice.cpp ut_slice_loader.cpp ut_versions.cpp diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h index 7f1df89254..2bc4e424e0 100644 --- a/ydb/core/testlib/actors/test_runtime.h +++ b/ydb/core/testlib/actors/test_runtime.h @@ -2,6 +2,7 @@ #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/mon/mon.h> +#include <ydb/core/base/memobserver.h> #include <library/cpp/actors/testlib/test_runtime.h> #include <library/cpp/testing/unittest/tests_data.h> @@ -31,6 +32,7 @@ namespace NActors { ~TNodeData(); ui64 GetLoggerPoolId() const override; THolder<NActors::TMon> Mon; + TIntrusivePtr<NKikimr::TMemObserver> MemObserver = new NKikimr::TMemObserver; }; struct TNodeFactory: public INodeFactory { @@ -79,6 +81,12 @@ namespace NActors { return f.ExtractValue(); } + TIntrusivePtr<NKikimr::TMemObserver> GetMemObserver(ui32 nodeIndex = 0) { + TGuard<TMutex> guard(Mutex); + auto node = GetNodeById(GetNodeId(nodeIndex)); + return node->MemObserver; + } + void SendToPipe(ui64 tabletId, const TActorId& sender, IEventBase* payload, ui32 nodeIndex = 0, const NKikimr::NTabletPipe::TClientConfig& pipeConfig = NKikimr::NTabletPipe::TClientConfig(), TActorId clientId = TActorId(), ui64 cookie = 0); void SendToPipe(TActorId clientId, const TActorId& sender, IEventBase* payload, diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp index 130446df02..d833168f84 100644 --- a/ydb/core/testlib/basics/services.cpp +++ b/ydb/core/testlib/basics/services.cpp @@ -136,7 +136,7 @@ namespace NPDisk { runtime.AddLocalService(MakeSharedPageCacheId(0), TActorSetupCmd( - CreateSharedPageCache(std::move(pageCollectionCacheConfig)), + CreateSharedPageCache(std::move(pageCollectionCacheConfig), runtime.GetMemObserver(nodeIndex)), TMailboxType::ReadAsFilled, 0), nodeIndex); diff --git a/ydb/core/util/cache_cache.h b/ydb/core/util/cache_cache.h index d94d281b42..2c4fa105a6 100644 --- a/ydb/core/util/cache_cache.h +++ b/ydb/core/util/cache_cache.h @@ -80,22 +80,24 @@ public: , WarmWeight(0) {} - // returns evicted elements as list - TIntrusiveList<TItem> EnsureLimits() { - TIntrusiveList<TItem> evictedList; - LimitFresh(evictedList); - LimitWarm(evictedList); - LimitStaging(evictedList); + TItem* EvictNext() { + TItem* ret = nullptr; - // Note: levels may be rearranged even if nothing was evicted - if (Config.ReportedFresh) - *Config.ReportedFresh = FreshWeight; - if (Config.ReportedWarm) - *Config.ReportedWarm = WarmWeight; - if (Config.ReportedStaging) - *Config.ReportedStaging = StagingWeight; + if (!StagingList.Empty()) { + ret = EvictNext(StagingList, StagingWeight); + if (Config.ReportedStaging) + *Config.ReportedStaging = StagingWeight; + } else if (!FreshList.Empty()) { + ret = EvictNext(FreshList, FreshWeight); + if (Config.ReportedFresh) + *Config.ReportedFresh = FreshWeight; + } else if (!WarmList.Empty()) { + ret = EvictNext(WarmList, WarmWeight); + if (Config.ReportedWarm) + *Config.ReportedWarm = WarmWeight; + } - return evictedList; + return ret; } // returns evicted elements as list @@ -264,6 +266,16 @@ private: } } + TItem* EvictNext(TIntrusiveList<TItem>& list, ui64& weight) { + Y_VERIFY_DEBUG(!list.Empty()); + + TItem *evicted = list.PopBack(); + Unlink(evicted, weight); + GenerationOp.Set(evicted, TCacheCacheConfig::CacheGenEvicted); + + return evicted; + } + private: TCacheCacheConfig Config; diff --git a/ydb/core/util/cache_cache_ut.cpp b/ydb/core/util/cache_cache_ut.cpp index 4efd16bc5b..1e4f5a84a7 100644 --- a/ydb/core/util/cache_cache_ut.cpp +++ b/ydb/core/util/cache_cache_ut.cpp @@ -64,16 +64,63 @@ Y_UNIT_TEST_SUITE(TCacheCacheTest) { UNIT_ASSERT_VALUES_EQUAL(staging->Val(), 1ULL); UNIT_ASSERT_VALUES_EQUAL(warm->Val(), 1ULL); UNIT_ASSERT(evicted.Empty()); + } + + Y_UNIT_TEST(EvictNext) { + TCacheCacheConfig::TCounterPtr fresh = new NMonitoring::TCounterForPtr; + TCacheCacheConfig::TCounterPtr staging = new NMonitoring::TCounterForPtr; + TCacheCacheConfig::TCounterPtr warm = new NMonitoring::TCounterForPtr; + + // 2 pages per layer + TCacheCacheConfig config(3, fresh, staging, warm); + TCacheCache<TPage, TCacheCacheConfig::TDefaultWeight<TPage>, TCacheCacheConfig::TDefaultGeneration<TPage>> cache(config); + + TVector<TPage> pages(6); + + cache.Touch(&pages[0]); + cache.Touch(&pages[1]); + cache.Touch(&pages[2]); + cache.Touch(&pages[3]); + cache.Touch(&pages[0]); + cache.Touch(&pages[1]); + cache.Touch(&pages[4]); + cache.Touch(&pages[5]); + UNIT_ASSERT(pages[0].CacheGeneration == TCacheCacheConfig::CacheGenWarm); + UNIT_ASSERT(pages[1].CacheGeneration == TCacheCacheConfig::CacheGenWarm); + UNIT_ASSERT(pages[2].CacheGeneration == TCacheCacheConfig::CacheGenStaging); + UNIT_ASSERT(pages[3].CacheGeneration == TCacheCacheConfig::CacheGenStaging); + UNIT_ASSERT(pages[4].CacheGeneration == TCacheCacheConfig::CacheGenFresh); + UNIT_ASSERT(pages[5].CacheGeneration == TCacheCacheConfig::CacheGenFresh); + UNIT_ASSERT_VALUES_EQUAL(fresh->Val(), 2ULL); + UNIT_ASSERT_VALUES_EQUAL(warm->Val(), 2ULL); + UNIT_ASSERT_VALUES_EQUAL(staging->Val(), 2ULL); + + UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[2]); + UNIT_ASSERT(pages[2].CacheGeneration == TCacheCacheConfig::CacheGenEvicted); + UNIT_ASSERT_VALUES_EQUAL(staging->Val(), 1ULL); - // note: cache eviction is unintuitive at the moment - // all levels are above their limits so EnsureLimits will evict everything - evicted = cache.EnsureLimits(); - UNIT_ASSERT(!evicted.Empty()); + UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[3]); + UNIT_ASSERT(pages[3].CacheGeneration == TCacheCacheConfig::CacheGenEvicted); + UNIT_ASSERT_VALUES_EQUAL(staging->Val(), 0ULL); + + UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[4]); + UNIT_ASSERT(pages[4].CacheGeneration == TCacheCacheConfig::CacheGenEvicted); + UNIT_ASSERT_VALUES_EQUAL(fresh->Val(), 1ULL); + + UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[5]); + UNIT_ASSERT(pages[5].CacheGeneration == TCacheCacheConfig::CacheGenEvicted); + UNIT_ASSERT_VALUES_EQUAL(fresh->Val(), 0ULL); + + UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[0]); UNIT_ASSERT(pages[0].CacheGeneration == TCacheCacheConfig::CacheGenEvicted); + UNIT_ASSERT_VALUES_EQUAL(warm->Val(), 1ULL); + + UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[1]); UNIT_ASSERT(pages[1].CacheGeneration == TCacheCacheConfig::CacheGenEvicted); - UNIT_ASSERT(pages[2].CacheGeneration == TCacheCacheConfig::CacheGenEvicted); - } + UNIT_ASSERT_VALUES_EQUAL(warm->Val(), 0ULL); + UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), nullptr); + } } } // namespace NKikimr |