aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungasc <kungasc@yandex-team.com>2023-07-13 11:25:17 +0300
committerkungasc <kungasc@yandex-team.com>2023-07-13 11:25:17 +0300
commit7f994596c9b79105e75062185e097f690c2342b4 (patch)
tree5d549e7f0b8881c8dac3200147424177ad76251c
parentdfdcf3308ce1eef73cc215b3bf2730292628ff63 (diff)
downloadydb-7f994596c9b79105e75062185e097f690c2342b4.tar.gz
KIKIMR-18537 Shrink shared cache according to free memory
-rw-r--r--ydb/core/base/memobserver.h51
-rw-r--r--ydb/core/base/ya.make1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp9
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h8
-rw-r--r--ydb/core/driver_lib/run/run.cpp5
-rw-r--r--ydb/core/driver_lib/run/run.h3
-rw-r--r--ydb/core/mon_alloc/monitor.cpp34
-rw-r--r--ydb/core/mon_alloc/monitor.h2
-rw-r--r--ydb/core/tablet_flat/flat_bio_actor.h2
-rw-r--r--ydb/core/tablet_flat/flat_sausagecache.cpp10
-rw-r--r--ydb/core/tablet_flat/flat_sausagecache.h14
-rw-r--r--ydb/core/tablet_flat/shared_cache_events.h4
-rw-r--r--ydb/core/tablet_flat/shared_sausagecache.cpp264
-rw-r--r--ydb/core/tablet_flat/shared_sausagecache.h5
-rw-r--r--ydb/core/tablet_flat/test/libs/exec/runner.h5
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp166
-rw-r--r--ydb/core/tablet_flat/ut/ut_slice_loader.cpp2
-rw-r--r--ydb/core/tablet_flat/ut/ya.make1
-rw-r--r--ydb/core/testlib/actors/test_runtime.h8
-rw-r--r--ydb/core/testlib/basics/services.cpp2
-rw-r--r--ydb/core/util/cache_cache.h40
-rw-r--r--ydb/core/util/cache_cache_ut.cpp59
26 files changed, 569 insertions, 130 deletions
diff --git a/ydb/core/base/memobserver.h b/ydb/core/base/memobserver.h
new file mode 100644
index 0000000000..033fc6aa57
--- /dev/null
+++ b/ydb/core/base/memobserver.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr {
+
+class TMemObserver : public TThrRefBase {
+public:
+ struct TMemStat {
+ ui64 Used;
+ ui64 HardLimit;
+ ui64 SoftLimit;
+ };
+
+ using TCallback = std::function<void()>;
+
+ void Subscribe(TCallback callback) {
+ auto guard = Guard(Mutex);
+ Callbacks.push_back(std::move(callback));
+ }
+
+ void SetStat(TMemStat stat) {
+ auto guard = Guard(Mutex);
+ Stat = stat;
+ }
+
+ void NotifyStat(TMemStat stat) {
+ TVector<TCallback> copy;
+ {
+ auto guard = Guard(Mutex);
+ copy = Callbacks;
+ Stat = stat;
+ }
+
+ for (const auto &c : copy) {
+ c();
+ }
+ }
+
+ TMemStat GetStat() {
+ auto guard = Guard(Mutex);
+ return Stat;
+ }
+
+private:
+ TVector<TCallback> Callbacks;
+ TMutex Mutex;
+ TMemStat Stat;
+};
+
+}
diff --git a/ydb/core/base/ya.make b/ydb/core/base/ya.make
index e6e2fe75a0..016b772782 100644
--- a/ydb/core/base/ya.make
+++ b/ydb/core/base/ya.make
@@ -34,6 +34,7 @@ SRCS(
location.h
logoblob.cpp
logoblob.h
+ memobserver.h
nameservice.h
path.cpp
pathid.cpp
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 5836e35379..8c158728d7 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -1106,8 +1106,9 @@ void TLocalServiceInitializer::InitializeServices(
// TSharedCacheInitializer
-TSharedCacheInitializer::TSharedCacheInitializer(const TKikimrRunConfig& runConfig)
+TSharedCacheInitializer::TSharedCacheInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver)
: IKikimrServicesInitializer(runConfig)
+ , MemObserver(std::move(memObserver))
{}
void TSharedCacheInitializer::InitializeServices(
@@ -1136,7 +1137,7 @@ void TSharedCacheInitializer::InitializeServices(
config->Counters = new TSharedPageCacheCounters(sausageGroup);
setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(MakeSharedPageCacheId(0),
- TActorSetupCmd(CreateSharedPageCache(std::move(config)), TMailboxType::ReadAsFilled, appData->UserPoolId)));
+ TActorSetupCmd(CreateSharedPageCache(std::move(config), MemObserver), TMailboxType::ReadAsFilled, appData->UserPoolId)));
auto *configurator = NConsole::CreateSharedCacheConfigurator();
setup->LocalServices.emplace_back(TActorId(),
@@ -2004,8 +2005,9 @@ void TPersQueueClusterTrackerInitializer::InitializeServices(NActors::TActorSyst
// TMemProfMonitorInitializer
-TMemProfMonitorInitializer::TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig)
+TMemProfMonitorInitializer::TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver)
: IKikimrServicesInitializer(runConfig)
+ , MemObserver(std::move(memObserver))
{}
void TMemProfMonitorInitializer::InitializeServices(
@@ -2019,6 +2021,7 @@ void TMemProfMonitorInitializer::InitializeServices(
}
IActor* monitorActor = CreateMemProfMonitor(
+ MemObserver,
1, // seconds
appData->Counters,
filePathPrefix);
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 3075e8dcb3..ad5aebe6dd 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -85,8 +85,10 @@ public:
};
class TSharedCacheInitializer : public IKikimrServicesInitializer {
+ TIntrusivePtr<TMemObserver> MemObserver;
+
public:
- TSharedCacheInitializer(const TKikimrRunConfig& runConfig);
+ TSharedCacheInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver);
void InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) override;
};
@@ -354,8 +356,10 @@ public:
};
class TMemProfMonitorInitializer : public IKikimrServicesInitializer {
+ TIntrusivePtr<TMemObserver> MemObserver;
+
public:
- TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig);
+ TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 106a41dd3a..e2c31c8261 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -369,6 +369,7 @@ TKikimrRunner::TKikimrRunner(std::shared_ptr<TModuleFactories> factories)
: ModuleFactories(std::move(factories))
, Counters(MakeIntrusive<::NMonitoring::TDynamicCounters>())
, PollerThreads(new NInterconnect::TPollerThreads)
+ , MemObserver(MakeIntrusive<TMemObserver>())
{
}
@@ -1397,7 +1398,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TLocalServiceInitializer(runConfig));
}
if (serviceMask.EnableSharedCache) {
- sil->AddServiceInitializer(new TSharedCacheInitializer(runConfig));
+ sil->AddServiceInitializer(new TSharedCacheInitializer(runConfig, MemObserver));
}
if (serviceMask.EnableBlobCache) {
sil->AddServiceInitializer(new TBlobCacheInitializer(runConfig));
@@ -1494,7 +1495,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TNetClassifierInitializer(runConfig));
}
- sil->AddServiceInitializer(new TMemProfMonitorInitializer(runConfig));
+ sil->AddServiceInitializer(new TMemProfMonitorInitializer(runConfig, MemObserver));
#if defined(ENABLE_MEMORY_TRACKING)
sil->AddServiceInitializer(new TMemoryTrackerInitializer(runConfig));
diff --git a/ydb/core/driver_lib/run/run.h b/ydb/core/driver_lib/run/run.h
index 89dda5dde7..8c6becf2a3 100644
--- a/ydb/core/driver_lib/run/run.h
+++ b/ydb/core/driver_lib/run/run.h
@@ -16,6 +16,7 @@
#include <ydb/core/client/server/grpc_server.h>
#include <ydb/core/fq/libs/shared_resources/interface/shared_resources.h>
#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/base/memobserver.h>
#include <ydb/core/tablet/node_tablet_monitor.h>
#include <ydb/core/tablet/tablet_setup.h>
#include <ydb/core/ymq/http/http.h>
@@ -65,6 +66,8 @@ protected:
std::shared_ptr<TLogBackend> LogBackend;
TAutoPtr<TActorSystem> ActorSystem;
+ TIntrusivePtr<TMemObserver> MemObserver;
+
TKikimrRunner(std::shared_ptr<TModuleFactories> factories = {});
virtual ~TKikimrRunner();
diff --git a/ydb/core/mon_alloc/monitor.cpp b/ydb/core/mon_alloc/monitor.cpp
index d6b955f380..7554f671aa 100644
--- a/ydb/core/mon_alloc/monitor.cpp
+++ b/ydb/core/mon_alloc/monitor.cpp
@@ -400,8 +400,11 @@ namespace NKikimr {
struct TDumpLogConfig {
static constexpr double RssUsageHard = 0.9;
static constexpr double RssUsageSoft = 0.85;
+ static constexpr double RssUsageSoftLimit = 0.8;
+ static constexpr double RssUsageNotifySlowLimit = 0.7;
static constexpr TDuration RepeatInterval = TDuration::Seconds(10);
static constexpr TDuration DumpInterval = TDuration::Minutes(10);
+ static constexpr TDuration NotifySlowInterval = TDuration::Seconds(10);
};
enum {
@@ -411,11 +414,13 @@ namespace NKikimr {
struct TEvDumpLogStats : public TEventLocal<TEvDumpLogStats, EvDumpLogStats> {};
+ const TIntrusivePtr<TMemObserver> MemObserver;
const TDuration Interval;
const std::unique_ptr<IAllocMonitor> AllocMonitor;
const TString FilePathPrefix;
TInstant LogMemoryStatsTime = TInstant::Now() - TDumpLogConfig::DumpInterval;
+ TInstant NotifyMemoryStatsTime = TInstant::Now() - TDumpLogConfig::NotifySlowInterval;
bool IsDangerous = false;
@@ -424,8 +429,9 @@ namespace NKikimr {
return EActivityType::ACTORLIB_STATS;
}
- TMemProfMonitor(TDuration interval, std::unique_ptr<IAllocMonitor> allocMonitor, const TString& filePathPrefix)
- : Interval(interval)
+ TMemProfMonitor(TIntrusivePtr<TMemObserver> memObserver, TDuration interval, std::unique_ptr<IAllocMonitor> allocMonitor, const TString& filePathPrefix)
+ : MemObserver(std::move(memObserver))
+ , Interval(interval)
, AllocMonitor(std::move(allocMonitor))
, FilePathPrefix(filePathPrefix)
{}
@@ -487,8 +493,7 @@ namespace NKikimr {
}
}
- void LogMemoryStatsIfNeeded(const TActorContext& ctx) noexcept {
- auto memoryUsage = TAllocState::GetMemoryUsage();
+ void LogMemoryStatsIfNeeded(const TActorContext& ctx, TMemoryUsage memoryUsage) noexcept {
auto usage = memoryUsage.Usage();
LOG_DEBUG_S(ctx, NKikimrServices::MEMORY_PROFILER, memoryUsage.ToString());
if (IsDangerous && usage < TDumpLogConfig::RssUsageSoft) {
@@ -504,7 +509,23 @@ namespace NKikimr {
void HandleWakeup(const TActorContext& ctx) noexcept {
AllocMonitor->Update(Interval);
- LogMemoryStatsIfNeeded(ctx);
+ auto memoryUsage = TAllocState::GetMemoryUsage();
+ LogMemoryStatsIfNeeded(ctx, memoryUsage);
+
+ TMemObserver::TMemStat stat{
+ memoryUsage.AnonRss,
+ memoryUsage.CGroupLimit,
+ static_cast<ui64>(memoryUsage.CGroupLimit * TDumpLogConfig::RssUsageSoftLimit)};
+
+ if (memoryUsage.AnonRss > TDumpLogConfig::RssUsageNotifySlowLimit ||
+ TInstant::Now() - NotifyMemoryStatsTime > TDumpLogConfig::NotifySlowInterval) {
+ NotifyMemoryStatsTime = TInstant::Now();
+ MemObserver->NotifyStat(stat);
+ } else {
+ // fast path: don't call callback, but update current stat
+ MemObserver->SetStat(stat);
+ }
+
ctx.Schedule(Interval, new TEvents::TEvWakeup());
}
@@ -531,8 +552,9 @@ namespace NKikimr {
};
}
- IActor* CreateMemProfMonitor(ui32 intervalSec, TDynamicCountersPtr counters, const TString& filePathPrefix) {
+ IActor* CreateMemProfMonitor(TIntrusivePtr<TMemObserver> memObserver, ui32 intervalSec, TDynamicCountersPtr counters, const TString& filePathPrefix) {
return new TMemProfMonitor(
+ memObserver,
TDuration::Seconds(intervalSec),
CreateAllocMonitor(GetServiceCounters(counters, "utils")),
filePathPrefix);
diff --git a/ydb/core/mon_alloc/monitor.h b/ydb/core/mon_alloc/monitor.h
index 52ade6297a..00e4421f8b 100644
--- a/ydb/core/mon_alloc/monitor.h
+++ b/ydb/core/mon_alloc/monitor.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/core/base/memobserver.h>
#include <ydb/core/control/immediate_control_board_impl.h>
#include <library/cpp/actors/core/defs.h>
@@ -45,6 +46,7 @@ namespace NKikimr {
};
NActors::IActor* CreateMemProfMonitor(
+ TIntrusivePtr<TMemObserver> memObserver,
ui32 intervalSec,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const TString& filePathPrefix = "");
diff --git a/ydb/core/tablet_flat/flat_bio_actor.h b/ydb/core/tablet_flat/flat_bio_actor.h
index caf3025c2e..faed351711 100644
--- a/ydb/core/tablet_flat/flat_bio_actor.h
+++ b/ydb/core/tablet_flat/flat_bio_actor.h
@@ -60,7 +60,7 @@ namespace NBlockIO {
};
template<typename ... TArgs>
- inline void Once(NActors::IActorOps *ops, TActorId service,
+ inline void Start(NActors::IActorOps *ops, TActorId service,
ui64 cookie, TArgs&& ... args) noexcept
{
auto self = ops->Register(new TBlockIO(service, cookie));
diff --git a/ydb/core/tablet_flat/flat_sausagecache.cpp b/ydb/core/tablet_flat/flat_sausagecache.cpp
index fc3a6cbdc3..327807d3fe 100644
--- a/ydb/core/tablet_flat/flat_sausagecache.cpp
+++ b/ydb/core/tablet_flat/flat_sausagecache.cpp
@@ -4,12 +4,12 @@
namespace NKikimr {
namespace NTabletFlatExecutor {
-TPrivatePageCache::TPage::TPage(ui32 size, ui32 pageId, TInfo* info)
+TPrivatePageCache::TPage::TPage(size_t size, ui32 pageId, TInfo* info)
: LoadState(LoadStateNo)
, Sticky(false)
, SharedPending(false)
- , Size(size)
, Id(pageId)
+ , Size(size)
, Info(info)
{}
@@ -462,7 +462,8 @@ void TPrivatePageCache::UpdateSharedBody(TInfo *info, ui32 pageId, TSharedPageRe
if (!page)
return;
- Y_VERIFY_DEBUG(page->SharedPending, "Shared cache accepted a page that is not pending, possible bug");
+ // Note: shared cache may accept a peinding page if it is used by multiple private caches
+ // (for expample, used by tablet and its follower)
if (Y_UNLIKELY(!page->SharedPending)) {
return;
}
@@ -486,7 +487,8 @@ void TPrivatePageCache::DropSharedBody(TInfo *info, ui32 pageId) {
if (!page)
return;
- Y_VERIFY_DEBUG(!page->SharedPending, "Shared cache dropped page sharing request, possible bug");
+ // Note: shared cache may drop a peinding page if it is used by multiple private caches
+ // (for expample, used by tablet and its follower)
if (Y_UNLIKELY(page->SharedPending)) {
// Shared cache rejected our page so we should drop it too
Stats.TotalSharedPending -= page->Size;
diff --git a/ydb/core/tablet_flat/flat_sausagecache.h b/ydb/core/tablet_flat/flat_sausagecache.h
index 3a1b830e2e..bec3c919e2 100644
--- a/ydb/core/tablet_flat/flat_sausagecache.h
+++ b/ydb/core/tablet_flat/flat_sausagecache.h
@@ -48,12 +48,12 @@ public:
LoadStateRequestedAsync,
};
- ui64 LoadState : 2;
- ui64 Sticky : 1;
- ui64 SharedPending : 1;
- ui64 : 4; // padding
- const ui64 Size : 24;
- const ui64 Id : 32;
+ ui32 LoadState : 2;
+ ui32 Sticky : 1;
+ ui32 SharedPending : 1;
+
+ const ui32 Id;
+ const size_t Size;
TInfo* const Info;
TIntrusivePtr<TPrivatePageCachePinPad> PinPad;
@@ -61,7 +61,7 @@ public:
TSharedPageRef SharedBody;
TSharedData PinnedBody;
- TPage(ui32 size, ui32 pageId, TInfo* info);
+ TPage(size_t size, ui32 pageId, TInfo* info);
TPage(const TPage&) = delete;
TPage(TPage&&) = delete;
diff --git a/ydb/core/tablet_flat/shared_cache_events.h b/ydb/core/tablet_flat/shared_cache_events.h
index e7ed55be96..a7abe542ef 100644
--- a/ydb/core/tablet_flat/shared_cache_events.h
+++ b/ydb/core/tablet_flat/shared_cache_events.h
@@ -26,6 +26,7 @@ namespace NSharedCache {
EvRequest,
EvResult,
EvUpdated,
+ EvMem,
EvEnd
@@ -130,6 +131,9 @@ namespace NSharedCache {
THashMap<TLogoBlobID, TActions> Actions;
};
+ struct TEvMem : public TEventLocal<TEvMem, EvMem> {
+ };
+
}
}
diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp
index c0cb5e7987..40d86df5fd 100644
--- a/ydb/core/tablet_flat/shared_sausagecache.cpp
+++ b/ydb/core/tablet_flat/shared_sausagecache.cpp
@@ -13,8 +13,10 @@
namespace NKikimr {
TSharedPageCacheCounters::TSharedPageCacheCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters> &group)
- : ActivePages(group->GetCounter("ActivePages"))
+ : MemLimitBytes(group->GetCounter("MemLimitBytes"))
+ , ActivePages(group->GetCounter("ActivePages"))
, ActiveBytes(group->GetCounter("ActiveBytes"))
+ , ActiveLimitBytes(group->GetCounter("ActiveLimitBytes"))
, PassivePages(group->GetCounter("PassivePages"))
, PassiveBytes(group->GetCounter("PassiveBytes"))
, RequestedPages(group->GetCounter("RequestedPages", true))
@@ -39,9 +41,7 @@ static bool Satisfies(NLog::EPriority priority = NLog::PRI_DEBUG) {
return false;
}
-class TSharedPageCache : public TActor<TSharedPageCache> {
- THolder<TSharedPageCacheConfig> Config;
-
+class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
using ELnLev = NUtil::ELnLev;
using TBlocks = TVector<NSharedCache::TEvResult::TLoaded>;
@@ -60,20 +60,21 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
: public TSharedPageHandle
, public TIntrusiveListItem<TPage>
{
- ui64 State : 4;
- ui64 CacheGeneration : 3;
- ui64 InMemory : 1;
- ui64 Size : 24;
- const ui64 PageId : 32;
+ ui32 State : 4;
+ ui32 CacheGeneration : 3;
+ ui32 InMemory : 1;
+
+ const ui32 PageId;
+ const size_t Size;
TCollection* Collection;
- TPage(ui32 pageId, TCollection* collection)
+ TPage(ui32 pageId, size_t size, TCollection* collection)
: State(PageStateNo)
, CacheGeneration(TCacheCacheConfig::CacheGenNone)
, InMemory(false)
- , Size(0)
, PageId(pageId)
+ , Size(size)
, Collection(collection)
{}
@@ -163,11 +164,13 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
TAutoPtr<NUtil::ILogger> Logger;
THashMap<TLogoBlobID, TCollection> Collections;
THashMap<TActorId, TCollectionsOwner> CollectionsOwners;
+ TIntrusivePtr<TMemObserver> MemObserver;
TRequestQueue AsyncRequests;
TRequestQueue ScanRequests;
- TCacheCache<TPage, TPage::TWeight> Cache;
+ THolder<TSharedPageCacheConfig> Config;
+ TCacheCache<TPage, TPage::TWeight, TCacheCacheConfig::TDefaultGeneration<TPage>> Cache;
TControlWrapper SizeOverride;
@@ -176,27 +179,90 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
ui64 StatHitBytes = 0;
ui64 StatMissPages = 0;
ui64 StatMissBytes = 0;
+ ui64 StatActiveBytes = 0;
ui64 StatPassiveBytes = 0;
+ ui64 StatLoadInFlyBytes = 0;
bool GCScheduled = false;
+ // 0 means unlimited
+ ui64 MemLimitBytes = 0;
+ ui64 ConfigLimitBytes;
+
void ActualizeCacheSizeLimit() {
if ((ui64)SizeOverride != Config->CacheConfig->Limit) {
Config->CacheConfig->SetLimit(SizeOverride);
}
- ui64 limit = Config->CacheConfig->Limit;
- limit = Max<ui64>(1,
- limit * Config->ActivePagesReservationPercent / 100,
- limit > StatPassiveBytes ? limit - StatPassiveBytes : 0);
+ ConfigLimitBytes = Config->CacheConfig->Limit;
+ ui64 limit = ConfigLimitBytes;
+ if (MemLimitBytes && ConfigLimitBytes > MemLimitBytes) {
+ limit = MemLimitBytes;
+ }
+
+ // limit of cache depends only on config and mem because passive pages may go in and out arbitrary
+ // we may have some passive bytes, so if we fully fill this Cache we may exceed the limit
+ // because of that DoGC should be called to ensure limits
Cache.UpdateCacheSize(limit);
- Evict(Cache.EnsureLimits());
+ if (Config->Counters) {
+ Config->Counters->ActiveLimitBytes->Set(limit);
+ }
+ }
+
+ void DoGC() {
+ // maybe we already have enough useless pages
+ // update StatActiveBytes + StatPassiveBytes
+ ProcessGCList();
+
+ ui64 configActiveReservedBytes = ConfigLimitBytes * Config->ActivePagesReservationPercent / 100;
+
+ THashSet<TCollection*> recheck;
+ while (MemLimitBytes && GetStatAllBytes() > MemLimitBytes
+ || GetStatAllBytes() > ConfigLimitBytes && StatActiveBytes > configActiveReservedBytes) {
+ auto page = Cache.EvictNext();
+ if (!page) {
+ break;
+ }
+ EvictNow(page, recheck);
+ }
+ if (recheck) {
+ CheckExpiredCollections(std::move(recheck));
+ }
+ }
+
+ void Handle(NSharedCache::TEvMem::TPtr &) {
+ // always get the latest value
+ auto mem = MemObserver->GetStat();
+
+ if (mem.SoftLimit) {
+ ui64 usedExternal = 0;
+ // we have: mem.Used = usedExternal + StatAllBytes
+ if (mem.Used > GetStatAllBytes()) {
+ usedExternal = mem.Used - GetStatAllBytes();
+ }
+
+ // we want: MemLimitBytes + externalUsage <= mem.SoftLimit
+ MemLimitBytes = mem.SoftLimit > usedExternal
+ ? mem.SoftLimit - usedExternal
+ : 1;
+ } else {
+ MemLimitBytes = 0;
+ }
+
+ if (Config->Counters) {
+ Config->Counters->MemLimitBytes->Set(MemLimitBytes);
+ }
+
+ ActualizeCacheSizeLimit();
+
+ DoGC();
}
void Registered(TActorSystem *sys, const TActorId &owner)
{
+ NActors::TActorBootstrapped<TSharedPageCache>::Registered(sys, owner);
Owner = owner;
Logger = new NUtil::TLogger(sys, NKikimrServices::TABLET_SAUSAGECACHE);
@@ -286,8 +352,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
metaId.ToString().c_str(), pageId, collection.PageMap.size());
auto* page = collection.PageMap[pageId].Get();
if (!page) {
- Y_VERIFY(collection.PageMap.emplace(pageId, (page = new TPage(pageId, &collection))));
- page->Size = pageCollection.Page(pageId).Size;
+ Y_VERIFY(collection.PageMap.emplace(pageId, (page = new TPage(pageId, pageCollection.Page(pageId).Size, &collection))));
}
if (Config->Counters) {
@@ -415,12 +480,9 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
if (queue) {
RequestFromQueue(*queue);
} else {
- if (Config->Counters) {
- *Config->Counters->LoadInFlyPages += pagesToRequest.size();
- *Config->Counters->LoadInFlyBytes += pagesToRequestBytes;
- }
+ AddInFlyPages(pagesToRequest.size(), pagesToRequestBytes);
auto *fetch = new NPageCollection::TFetch(0, waitingRequest->PageCollection, std::move(pagesToRequest));
- NBlockIO::Once(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch);
+ NBlockIO::Start(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch);
}
}
} else {
@@ -432,7 +494,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
SendReadyBlocks(*waitingRequest);
}
- ProcessGCList();
+ DoGC();
}
void RequestFromQueue(TRequestQueue &queue) {
@@ -500,10 +562,11 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
<< " pages " << toLoad;
}
+ AddInFlyPages(toLoad.size(), sizeToLoad);
// fetch cookie -> requested size;
// event cookie -> ptr to queue
auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad));
- NBlockIO::Once(this, wa.Owner, (ui64)&queue, wa.Priority, fetch);
+ NBlockIO::Start(this, wa.Owner, (ui64)&queue, wa.Priority, fetch);
}
}
}
@@ -565,8 +628,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
auto* page = collection.PageMap[pageId].Get();
if (!page) {
if (x.second) {
- Y_VERIFY(collection.PageMap.emplace(pageId, (page = new TPage(pageId, &collection))));
- page->Size = x.second.size();
+ Y_VERIFY(collection.PageMap.emplace(pageId, (page = new TPage(pageId, x.second.size(), &collection))));
} else {
continue;
}
@@ -612,7 +674,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
Send(ev->Sender, msg.Release());
}
- ProcessGCList();
+ DoGC();
}
void Handle(NSharedCache::TEvUnregister::TPtr &ev) {
@@ -697,9 +759,8 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
if (!page || !page->HasMissingBody())
continue;
- if (page->State == PageStateRequested && Config->Counters) {
- --*Config->Counters->LoadInFlyPages;
- *Config->Counters->LoadInFlyBytes -= page->Size;
+ if (page->State == PageStateRequested || page->State == PageStateRequestedAsync) {
+ RemoveInFlyPage(page);
}
page->Initialize(std::move(paged.Data));
@@ -708,7 +769,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
}
- ProcessGCList();
+ DoGC();
}
void TryDropExpiredCollection(TCollection* collection) {
@@ -733,28 +794,38 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
while (auto rawPage = GCList->PopGC()) {
auto* page = static_cast<TPage*>(rawPage.Get());
- if (page->TryDrop()) {
- // We have successfully dropped the page
- // We are guaranteed no new uses for this page are possible
- Y_VERIFY(page->State == PageStateEvicted);
- RemovePassivePage(page);
-
- Y_VERIFY_DEBUG_S(page->Collection, "Evicted pages are expected to have collection");
- if (auto* collection = page->Collection) {
- Y_VERIFY_DEBUG(collection->PageMap[page->PageId].Get() == page);
- Y_VERIFY(collection->PageMap.erase(page->PageId));
- if (collection->Owners) {
- collection->DroppedPages.push_back(page->PageId);
- }
- recheck.insert(collection);
- }
- }
+ TryDrop(page, recheck);
}
if (recheck) {
CheckExpiredCollections(std::move(recheck));
}
+ TryScheduleGC();
+ }
+
+ void TryDrop(TPage* page, THashSet<TCollection*>& recheck) {
+ if (page->TryDrop()) {
+ // We have successfully dropped the page
+ // We are guaranteed no new uses for this page are possible
+ Y_VERIFY(page->State == PageStateEvicted);
+ RemovePassivePage(page);
+
+ Y_VERIFY_DEBUG_S(page->Collection, "Evicted pages are expected to have collection");
+ if (auto* collection = page->Collection) {
+ auto pageId = page->PageId;
+ Y_VERIFY_DEBUG(collection->PageMap[pageId].Get() == page);
+ Y_VERIFY(collection->PageMap.erase(pageId));
+ // Note: don't use page after erase as it may be deleted
+ if (collection->Owners) {
+ collection->DroppedPages.push_back(pageId);
+ }
+ recheck.insert(collection);
+ }
+ }
+ }
+
+ void TryScheduleGC() {
if (!GCScheduled) {
TActivationContext::AsActorContext().Schedule(TDuration::Seconds(15), new TEvents::TEvWakeup());
GCScheduled = true;
@@ -858,11 +929,10 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
continue;
}
- if (page->State == PageStateRequested && Config->Counters) {
+ if (page->State == PageStateRequested) {
// Request is technically inflight, but response will be ignored
// Pretend request is cancelled for simplicity
- --*Config->Counters->LoadInFlyPages;
- *Config->Counters->LoadInFlyBytes -= page->Size;
+ RemoveInFlyPage(page);
}
page->Collection = nullptr;
@@ -933,36 +1003,35 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
void Evict(TIntrusiveList<TPage>&& pages) {
while (!pages.Empty()) {
TPage* page = pages.PopFront();
- Y_VERIFY(page->CacheGeneration == TCacheCacheConfig::CacheGenEvicted);
+
+ Y_VERIFY_S(page->CacheGeneration == TCacheCacheConfig::CacheGenEvicted, "unexpected " << page->CacheGeneration << " page cache generation");
page->CacheGeneration = TCacheCacheConfig::CacheGenNone;
- switch (page->State) {
- case PageStateNo:
- Y_FAIL("unexpected uninitialized page");
-
- case PageStateEvicted:
- Y_FAIL("unexpected evicted page");
-
- case PageStateRequested:
- case PageStateRequestedAsync:
- case PageStatePending:
- break;
-
- case PageStateLoaded:
- page->State = PageStateEvicted;
- RemoveActivePage(page);
- AddPassivePage(page);
- if (page->UnUse()) {
- GCList->PushGC(page);
- }
- break;
-
- default:
- Y_FAIL("unknown load state");
+ Y_VERIFY_S(page->State == PageStateLoaded, "unexpected " << page->State << " page state");
+ page->State = PageStateEvicted;
+
+ RemoveActivePage(page);
+ AddPassivePage(page);
+ if (page->UnUse()) {
+ GCList->PushGC(page);
}
}
}
+ void EvictNow(TPage* page, THashSet<TCollection*>& recheck) {
+ Y_VERIFY_S(page->CacheGeneration == TCacheCacheConfig::CacheGenEvicted, "unexpected " << page->CacheGeneration << " page cache generation");
+ page->CacheGeneration = TCacheCacheConfig::CacheGenNone;
+
+ Y_VERIFY_S(page->State == PageStateLoaded, "unexpected " << page->State << " page state");
+ page->State = PageStateEvicted;
+
+ RemoveActivePage(page);
+ AddPassivePage(page);
+ if (page->UnUse()) {
+ TryDrop(page, recheck);
+ }
+ }
+
void Handle(TEvSharedPageCache::TEvConfigure::TPtr& ev) {
const auto* msg = ev->Get();
@@ -987,7 +1056,12 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
}
+ inline ui64 GetStatAllBytes() const {
+ return StatActiveBytes + StatPassiveBytes + StatLoadInFlyBytes;
+ }
+
inline void AddActivePage(const TPage* page) {
+ StatActiveBytes += sizeof(TPage) + page->Size;
if (Config->Counters) {
++*Config->Counters->ActivePages;
*Config->Counters->ActiveBytes += sizeof(TPage) + page->Size;
@@ -1002,7 +1076,17 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
}
+ inline void AddInFlyPages(ui64 count, ui64 size) {
+ StatLoadInFlyBytes += sizeof(TPage) * count + size;
+ if (Config->Counters) {
+ *Config->Counters->LoadInFlyPages += count;
+ *Config->Counters->LoadInFlyBytes += sizeof(TPage) * count + size;
+ }
+ }
+
inline void RemoveActivePage(const TPage* page) {
+ Y_VERIFY_DEBUG(StatActiveBytes >= sizeof(TPage) + page->Size);
+ StatActiveBytes -= sizeof(TPage) + page->Size;
if (Config->Counters) {
--*Config->Counters->ActivePages;
*Config->Counters->ActiveBytes -= sizeof(TPage) + page->Size;
@@ -1010,6 +1094,7 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
inline void RemovePassivePage(const TPage* page) {
+ Y_VERIFY_DEBUG(StatPassiveBytes >= sizeof(TPage) + page->Size);
StatPassiveBytes -= sizeof(TPage) + page->Size;
if (Config->Counters) {
--*Config->Counters->PassivePages;
@@ -1017,17 +1102,35 @@ class TSharedPageCache : public TActor<TSharedPageCache> {
}
}
+ inline void RemoveInFlyPage(const TPage* page) {
+ Y_VERIFY_DEBUG(StatLoadInFlyBytes >= sizeof(TPage) + page->Size);
+ StatLoadInFlyBytes -= sizeof(TPage) + page->Size;
+ if (Config->Counters) {
+ --*Config->Counters->LoadInFlyPages;
+ *Config->Counters->LoadInFlyBytes -= sizeof(TPage) + page->Size;
+ }
+ }
+
public:
- TSharedPageCache(THolder<TSharedPageCacheConfig> config)
- : TActor(&TThis::StateFunc)
+ TSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver)
+ : MemObserver(std::move(memObserver))
, Config(std::move(config))
, Cache(*Config->CacheConfig)
, SizeOverride(Config->CacheConfig->Limit, 1, Max<i64>())
+ , ConfigLimitBytes(Config->CacheConfig->Limit)
{
AsyncRequests.Limit = Config->TotalAsyncQueueInFlyLimit;
ScanRequests.Limit = Config->TotalScanQueueInFlyLimit;
}
+ void Bootstrap() {
+ MemObserver->Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()] () {
+ actorSystem->Send(selfId, new NSharedCache::TEvMem());
+ });
+
+ Become(&TThis::StateFunc);
+ }
+
STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(NSharedCache::TEvAttach, Handle);
@@ -1035,6 +1138,7 @@ public:
hFunc(NSharedCache::TEvTouch, Handle);
hFunc(NSharedCache::TEvUnregister, Handle);
hFunc(NSharedCache::TEvInvalidate, Handle);
+ hFunc(NSharedCache::TEvMem, Handle);
hFunc(NBlockIO::TEvData, Handle);
hFunc(TEvSharedPageCache::TEvConfigure, Handle);
cFunc(TEvents::TSystem::Wakeup, GCWakeup);
@@ -1049,8 +1153,8 @@ public:
} // NTabletFlatExecutor
-IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config) {
- return new NTabletFlatExecutor::TSharedPageCache(std::move(config));
+IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver) {
+ return new NTabletFlatExecutor::TSharedPageCache(std::move(config), std::move(memObserver));
}
}
diff --git a/ydb/core/tablet_flat/shared_sausagecache.h b/ydb/core/tablet_flat/shared_sausagecache.h
index cf5e2f1c14..28ae0825b4 100644
--- a/ydb/core/tablet_flat/shared_sausagecache.h
+++ b/ydb/core/tablet_flat/shared_sausagecache.h
@@ -1,5 +1,6 @@
#pragma once
#include "defs.h"
+#include <ydb/core/base/memobserver.h>
#include <ydb/core/protos/shared_cache.pb.h>
#include <ydb/core/util/cache_cache.h>
#include <util/system/unaligned_mem.h>
@@ -23,8 +24,10 @@ struct TEvSharedPageCache {
struct TSharedPageCacheCounters final : public TAtomicRefCount<TSharedPageCacheCounters> {
using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr;
+ const TCounterPtr MemLimitBytes;
const TCounterPtr ActivePages;
const TCounterPtr ActiveBytes;
+ const TCounterPtr ActiveLimitBytes;
const TCounterPtr PassivePages;
const TCounterPtr PassiveBytes;
const TCounterPtr RequestedPages;
@@ -48,7 +51,7 @@ struct TSharedPageCacheConfig {
ui32 ActivePagesReservationPercent = 50;
};
-IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config);
+IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver);
inline TActorId MakeSharedPageCacheId(ui64 id = 0) {
char x[12] = { 's', 'h', 's', 'c' };
diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h
index 5196950aa3..5361c1f0c2 100644
--- a/ydb/core/tablet_flat/test/libs/exec/runner.h
+++ b/ydb/core/tablet_flat/test/libs/exec/runner.h
@@ -163,7 +163,7 @@ namespace NFake {
{ /*_ Resource broker service, used for generic scans */
using namespace NResourceBroker;
- auto *actor = CreateResourceBrokerActor(MakeDefaultConfig(), Env.GetDynamicCounters(0));
+ auto *actor = CreateResourceBrokerActor(MakeDefaultConfig(), Env.GetDynamicCounters());
AddService(MakeResourceBrokerID(), actor, EMail::Revolving);
}
@@ -183,8 +183,9 @@ namespace NFake {
config->CacheConfig = new TCacheCacheConfig(conf.Shared, nullptr, nullptr, nullptr);
config->TotalAsyncQueueInFlyLimit = conf.AsyncQueue;
config->TotalScanQueueInFlyLimit = conf.ScanQueue;
+ config->Counters = MakeIntrusive<TSharedPageCacheCounters>(Env.GetDynamicCounters());
- auto *actor = CreateSharedPageCache(std::move(config));
+ auto *actor = CreateSharedPageCache(std::move(config), Env.GetMemObserver());
RunOn(3, MakeSharedPageCacheId(0), actor, EMail::ReadAsFilled);
}
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
index e76cc7340f..f5e5f0917d 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
@@ -71,6 +71,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_forward.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
index 232fb00f89..4723dfaccd 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
@@ -74,6 +74,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_forward.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
index 4f9b399a40..233cb18aa6 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
@@ -75,6 +75,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_forward.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
index 716aa62ec4..b8a2359330 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
@@ -64,6 +64,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_forward.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
new file mode 100644
index 0000000000..26bdf4f71e
--- /dev/null
+++ b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
@@ -0,0 +1,166 @@
+#include "flat_executor_ut_common.h"
+
+#include <util/system/sanitizers.h>
+
+namespace NKikimr {
+namespace NTabletFlatExecutor {
+
+Y_UNIT_TEST_SUITE(TSharedPageCache) {
+
+constexpr i64 MB = 1024*1024;
+
+enum : ui32 {
+ TableId = 101,
+ KeyColumnId = 1,
+ ValueColumnId = 2,
+};
+
+struct TTxInitSchema : public ITransaction {
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ if (txc.DB.GetScheme().GetTableInfo(TableId))
+ return true;
+
+ txc.DB.Alter()
+ .AddTable("test" + ToString(ui32(TableId)), TableId)
+ .AddColumn(TableId, "key", KeyColumnId, NScheme::TInt64::TypeId, false)
+ .AddColumn(TableId, "value", ValueColumnId, NScheme::TString::TypeId, false)
+ .AddColumnToKey(TableId, KeyColumnId);
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ }
+};
+
+struct TTxWriteRow : public ITransaction {
+ i64 Key;
+ TString Value;
+
+ explicit TTxWriteRow(i64 key, TString value)
+ : Key(key)
+ , Value(std::move(value))
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ const auto key = NScheme::TInt64::TInstance(Key);
+
+ const auto val = NScheme::TString::TInstance(Value);
+ NTable::TUpdateOp ops{ ValueColumnId, NTable::ECellOp::Set, val };
+
+ txc.DB.Update(TableId, NTable::ERowOp::Upsert, { key }, { ops });
+
+ return true;
+ }
+
+ void Complete(const TActorContext&ctx) override {
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ }
+};
+
+struct TTxReadRow : public ITransaction {
+ i64 Key;
+
+ explicit TTxReadRow(i64 key)
+ : Key(key)
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ TVector<TRawTypeValue> rawKey;
+ rawKey.emplace_back(&Key, sizeof(Key), NScheme::TInt64::TypeId);
+
+ TVector<NTable::TTag> tags;
+ tags.push_back(KeyColumnId);
+ tags.push_back(ValueColumnId);
+
+ NTable::TRowState row;
+ auto ready = txc.DB.Select(TableId, rawKey, tags, row);
+ if (ready == NTable::EReady::Page) {
+ return false;
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext&ctx) override {
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ }
+};
+
+void LogCounters(TIntrusivePtr<TSharedPageCacheCounters> counters) {
+ Cerr << "Counters: Active:" << counters->ActiveBytes->Val() << "/" << counters->ActiveLimitBytes->Val() << ", Passive:" << counters->PassiveBytes->Val() << ", MemLimit:" << counters->MemLimitBytes->Val() << Endl;
+}
+
+void WaitMemEvent(TMyEnvBase& env) {
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(NSharedCache::EvMem, 1));
+ env->DispatchEvents(options);
+}
+
+Y_UNIT_TEST(Limits) {
+ TMyEnvBase env;
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters());
+
+ env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
+ env.SendSync(new NFake::TEvExecute{ new TTxInitSchema() });
+
+ // write 300 rows, each ~100KB
+ for (i64 key = 0; key < 300; ++key) {
+ TString value(size_t(100 * 1024), char('a' + key % 26));
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) });
+ }
+
+ Cerr << "...compacting" << Endl;
+ env.SendSync(new NFake::TEvCompact(TableId));
+ Cerr << "...waiting until compacted" << Endl;
+ env.WaitFor<NFake::TEvCompacted>();
+
+ for (i64 key = 0; key < 100; ++key) {
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key) });
+ }
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(counters->LoadInFlyBytes->Val(), 0);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB / 2); // 2 full layers (fresh & staging)
+ UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB);
+ UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 0);
+
+ env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB});
+ WaitMemEvent(env);
+ LogCounters(counters);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB / 2);
+ UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB);
+ UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 5*MB + counters->ActiveBytes->Val() + counters->PassiveBytes->Val());
+
+
+ env->GetMemObserver()->NotifyStat({101*MB, 110*MB, 100*MB});
+ WaitMemEvent(env);
+ LogCounters(counters);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2); // 1mb evicted
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2);
+ UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), counters->ActiveLimitBytes->Val());
+
+ env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB});
+ WaitMemEvent(env);
+ LogCounters(counters);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2);
+ UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB);
+ UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 5*MB + counters->ActiveBytes->Val() + counters->PassiveBytes->Val()); // +5mb
+
+ env->GetMemObserver()->NotifyStat({200*MB, 110*MB, 100*MB});
+ WaitMemEvent(env);
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(counters->ActiveBytes->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 1); // zero limit
+ UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 1); // zero limit
+}
+
+} // Y_UNIT_TEST_SUITE(TSharedPageCache)
+
+} // namespace NTabletFlatExecutor
+} // namespace NKikimr
diff --git a/ydb/core/tablet_flat/ut/ut_slice_loader.cpp b/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
index a2516d8947..df307fb627 100644
--- a/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
+++ b/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
@@ -74,7 +74,7 @@ namespace {
{
const auto array = Part->Store->PageCollectionArray(Room);
- return { ui32(array.at(page).size()), ui32(EPage::Undef) };
+ return { array.at(page).size(), ui32(EPage::Undef) };
}
NPageCollection::TBorder Bounds(ui32) const noexcept override
diff --git a/ydb/core/tablet_flat/ut/ya.make b/ydb/core/tablet_flat/ut/ya.make
index eb1528e860..aeae789e3a 100644
--- a/ydb/core/tablet_flat/ut/ya.make
+++ b/ydb/core/tablet_flat/ut/ya.make
@@ -47,6 +47,7 @@ SRCS(
ut_forward.cpp
ut_screen.cpp
ut_bloom.cpp
+ ut_shared_sausagecache.cpp
ut_slice.cpp
ut_slice_loader.cpp
ut_versions.cpp
diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h
index 7f1df89254..2bc4e424e0 100644
--- a/ydb/core/testlib/actors/test_runtime.h
+++ b/ydb/core/testlib/actors/test_runtime.h
@@ -2,6 +2,7 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/mon/mon.h>
+#include <ydb/core/base/memobserver.h>
#include <library/cpp/actors/testlib/test_runtime.h>
#include <library/cpp/testing/unittest/tests_data.h>
@@ -31,6 +32,7 @@ namespace NActors {
~TNodeData();
ui64 GetLoggerPoolId() const override;
THolder<NActors::TMon> Mon;
+ TIntrusivePtr<NKikimr::TMemObserver> MemObserver = new NKikimr::TMemObserver;
};
struct TNodeFactory: public INodeFactory {
@@ -79,6 +81,12 @@ namespace NActors {
return f.ExtractValue();
}
+ TIntrusivePtr<NKikimr::TMemObserver> GetMemObserver(ui32 nodeIndex = 0) {
+ TGuard<TMutex> guard(Mutex);
+ auto node = GetNodeById(GetNodeId(nodeIndex));
+ return node->MemObserver;
+ }
+
void SendToPipe(ui64 tabletId, const TActorId& sender, IEventBase* payload, ui32 nodeIndex = 0,
const NKikimr::NTabletPipe::TClientConfig& pipeConfig = NKikimr::NTabletPipe::TClientConfig(), TActorId clientId = TActorId(), ui64 cookie = 0);
void SendToPipe(TActorId clientId, const TActorId& sender, IEventBase* payload,
diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp
index 130446df02..d833168f84 100644
--- a/ydb/core/testlib/basics/services.cpp
+++ b/ydb/core/testlib/basics/services.cpp
@@ -136,7 +136,7 @@ namespace NPDisk {
runtime.AddLocalService(MakeSharedPageCacheId(0),
TActorSetupCmd(
- CreateSharedPageCache(std::move(pageCollectionCacheConfig)),
+ CreateSharedPageCache(std::move(pageCollectionCacheConfig), runtime.GetMemObserver(nodeIndex)),
TMailboxType::ReadAsFilled,
0),
nodeIndex);
diff --git a/ydb/core/util/cache_cache.h b/ydb/core/util/cache_cache.h
index d94d281b42..2c4fa105a6 100644
--- a/ydb/core/util/cache_cache.h
+++ b/ydb/core/util/cache_cache.h
@@ -80,22 +80,24 @@ public:
, WarmWeight(0)
{}
- // returns evicted elements as list
- TIntrusiveList<TItem> EnsureLimits() {
- TIntrusiveList<TItem> evictedList;
- LimitFresh(evictedList);
- LimitWarm(evictedList);
- LimitStaging(evictedList);
+ TItem* EvictNext() {
+ TItem* ret = nullptr;
- // Note: levels may be rearranged even if nothing was evicted
- if (Config.ReportedFresh)
- *Config.ReportedFresh = FreshWeight;
- if (Config.ReportedWarm)
- *Config.ReportedWarm = WarmWeight;
- if (Config.ReportedStaging)
- *Config.ReportedStaging = StagingWeight;
+ if (!StagingList.Empty()) {
+ ret = EvictNext(StagingList, StagingWeight);
+ if (Config.ReportedStaging)
+ *Config.ReportedStaging = StagingWeight;
+ } else if (!FreshList.Empty()) {
+ ret = EvictNext(FreshList, FreshWeight);
+ if (Config.ReportedFresh)
+ *Config.ReportedFresh = FreshWeight;
+ } else if (!WarmList.Empty()) {
+ ret = EvictNext(WarmList, WarmWeight);
+ if (Config.ReportedWarm)
+ *Config.ReportedWarm = WarmWeight;
+ }
- return evictedList;
+ return ret;
}
// returns evicted elements as list
@@ -264,6 +266,16 @@ private:
}
}
+ TItem* EvictNext(TIntrusiveList<TItem>& list, ui64& weight) {
+ Y_VERIFY_DEBUG(!list.Empty());
+
+ TItem *evicted = list.PopBack();
+ Unlink(evicted, weight);
+ GenerationOp.Set(evicted, TCacheCacheConfig::CacheGenEvicted);
+
+ return evicted;
+ }
+
private:
TCacheCacheConfig Config;
diff --git a/ydb/core/util/cache_cache_ut.cpp b/ydb/core/util/cache_cache_ut.cpp
index 4efd16bc5b..1e4f5a84a7 100644
--- a/ydb/core/util/cache_cache_ut.cpp
+++ b/ydb/core/util/cache_cache_ut.cpp
@@ -64,16 +64,63 @@ Y_UNIT_TEST_SUITE(TCacheCacheTest) {
UNIT_ASSERT_VALUES_EQUAL(staging->Val(), 1ULL);
UNIT_ASSERT_VALUES_EQUAL(warm->Val(), 1ULL);
UNIT_ASSERT(evicted.Empty());
+ }
+
+ Y_UNIT_TEST(EvictNext) {
+ TCacheCacheConfig::TCounterPtr fresh = new NMonitoring::TCounterForPtr;
+ TCacheCacheConfig::TCounterPtr staging = new NMonitoring::TCounterForPtr;
+ TCacheCacheConfig::TCounterPtr warm = new NMonitoring::TCounterForPtr;
+
+ // 2 pages per layer
+ TCacheCacheConfig config(3, fresh, staging, warm);
+ TCacheCache<TPage, TCacheCacheConfig::TDefaultWeight<TPage>, TCacheCacheConfig::TDefaultGeneration<TPage>> cache(config);
+
+ TVector<TPage> pages(6);
+
+ cache.Touch(&pages[0]);
+ cache.Touch(&pages[1]);
+ cache.Touch(&pages[2]);
+ cache.Touch(&pages[3]);
+ cache.Touch(&pages[0]);
+ cache.Touch(&pages[1]);
+ cache.Touch(&pages[4]);
+ cache.Touch(&pages[5]);
+ UNIT_ASSERT(pages[0].CacheGeneration == TCacheCacheConfig::CacheGenWarm);
+ UNIT_ASSERT(pages[1].CacheGeneration == TCacheCacheConfig::CacheGenWarm);
+ UNIT_ASSERT(pages[2].CacheGeneration == TCacheCacheConfig::CacheGenStaging);
+ UNIT_ASSERT(pages[3].CacheGeneration == TCacheCacheConfig::CacheGenStaging);
+ UNIT_ASSERT(pages[4].CacheGeneration == TCacheCacheConfig::CacheGenFresh);
+ UNIT_ASSERT(pages[5].CacheGeneration == TCacheCacheConfig::CacheGenFresh);
+ UNIT_ASSERT_VALUES_EQUAL(fresh->Val(), 2ULL);
+ UNIT_ASSERT_VALUES_EQUAL(warm->Val(), 2ULL);
+ UNIT_ASSERT_VALUES_EQUAL(staging->Val(), 2ULL);
+
+ UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[2]);
+ UNIT_ASSERT(pages[2].CacheGeneration == TCacheCacheConfig::CacheGenEvicted);
+ UNIT_ASSERT_VALUES_EQUAL(staging->Val(), 1ULL);
- // note: cache eviction is unintuitive at the moment
- // all levels are above their limits so EnsureLimits will evict everything
- evicted = cache.EnsureLimits();
- UNIT_ASSERT(!evicted.Empty());
+ UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[3]);
+ UNIT_ASSERT(pages[3].CacheGeneration == TCacheCacheConfig::CacheGenEvicted);
+ UNIT_ASSERT_VALUES_EQUAL(staging->Val(), 0ULL);
+
+ UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[4]);
+ UNIT_ASSERT(pages[4].CacheGeneration == TCacheCacheConfig::CacheGenEvicted);
+ UNIT_ASSERT_VALUES_EQUAL(fresh->Val(), 1ULL);
+
+ UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[5]);
+ UNIT_ASSERT(pages[5].CacheGeneration == TCacheCacheConfig::CacheGenEvicted);
+ UNIT_ASSERT_VALUES_EQUAL(fresh->Val(), 0ULL);
+
+ UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[0]);
UNIT_ASSERT(pages[0].CacheGeneration == TCacheCacheConfig::CacheGenEvicted);
+ UNIT_ASSERT_VALUES_EQUAL(warm->Val(), 1ULL);
+
+ UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), &pages[1]);
UNIT_ASSERT(pages[1].CacheGeneration == TCacheCacheConfig::CacheGenEvicted);
- UNIT_ASSERT(pages[2].CacheGeneration == TCacheCacheConfig::CacheGenEvicted);
- }
+ UNIT_ASSERT_VALUES_EQUAL(warm->Val(), 0ULL);
+ UNIT_ASSERT_VALUES_EQUAL(cache.EvictNext(), nullptr);
+ }
}
} // namespace NKikimr