aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungasc <kungasc@yandex-team.com>2023-07-26 13:26:24 +0300
committerkungasc <kungasc@yandex-team.com>2023-07-26 13:26:24 +0300
commit2c2762d53cba845d7628d73b4a5077e026816f8b (patch)
tree344322e96a280f24499339d3e54f5eaf73554170
parent2ccacfcb71600c3a39b95a255753c0335dbfc95d (diff)
downloadydb-2c2762d53cba845d7628d73b4a5077e026816f8b.tar.gz
KIKIMR-18716 Limit MemTable total consumption
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp7
-rw-r--r--ydb/core/protos/shared_cache.proto1
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp52
-rw-r--r--ydb/core/tablet_flat/flat_executor.h3
-rw-r--r--ydb/core/tablet_flat/flat_executor_compaction_logic.cpp56
-rw-r--r--ydb/core/tablet_flat/flat_executor_compaction_logic.h8
-rw-r--r--ydb/core/tablet_flat/shared_cache_events.h49
-rw-r--r--ydb/core/tablet_flat/shared_cache_memtable.h23
-rw-r--r--ydb/core/tablet_flat/shared_sausagecache.cpp183
-rw-r--r--ydb/core/tablet_flat/shared_sausagecache.h3
-rw-r--r--ydb/core/tablet_flat/test/libs/exec/runner.h2
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp129
-rw-r--r--ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp374
-rw-r--r--ydb/core/tablet_flat/ut/ya.make1
-rw-r--r--ydb/core/tablet_flat/ya.make1
19 files changed, 879 insertions, 17 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index c790d982684..9857c6fbb5c 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -1128,6 +1128,13 @@ void TSharedCacheInitializer::InitializeServices(
config->TotalAsyncQueueInFlyLimit = cfg.GetAsyncQueueInFlyLimit();
config->TotalScanQueueInFlyLimit = cfg.GetScanQueueInFlyLimit();
+ if (cfg.HasActivePagesReservationPercent()) {
+ config->ActivePagesReservationPercent = cfg.GetActivePagesReservationPercent();
+ }
+ if (cfg.HasMemTableReservationPercent()) {
+ config->MemTableReservationPercent = cfg.GetMemTableReservationPercent();
+ }
+
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> sausageGroup = tabletGroup->GetSubgroup("type", "S_CACHE");
diff --git a/ydb/core/protos/shared_cache.proto b/ydb/core/protos/shared_cache.proto
index 8cca85c0f90..33758ea0980 100644
--- a/ydb/core/protos/shared_cache.proto
+++ b/ydb/core/protos/shared_cache.proto
@@ -6,4 +6,5 @@ message TSharedCacheConfig {
optional uint64 ScanQueueInFlyLimit = 2 [default = 536870912];
optional uint64 AsyncQueueInFlyLimit = 3 [default = 536870912];
optional uint32 ActivePagesReservationPercent = 4;
+ optional uint32 MemTableReservationPercent = 5;
}
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 449908f81e4..4851cde9e50 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -23,6 +23,7 @@
#include "flat_abi_evol.h"
#include "probes.h"
#include "shared_sausagecache.h"
+#include "shared_cache_memtable.h"
#include "util_fmt_desc.h"
#include <ydb/core/base/appdata.h>
@@ -57,6 +58,36 @@ struct TCompactionChangesCtx {
{ }
};
+class TSharedPageCacheMemTableObserver : public NSharedCache::ISharedPageCacheMemTableObserver {
+public:
+ TSharedPageCacheMemTableObserver(TActorSystem* actorSystem, TActorId owner)
+ : ActorSystem(actorSystem)
+ , Owner(owner)
+ , SharedCacheId(MakeSharedPageCacheId())
+ {}
+
+ void Register(ui32 table) override {
+ Send(new NSharedCache::TEvMemTableRegister(table));
+ }
+
+ void Unregister(ui32 table) override {
+ Send(new NSharedCache::TEvMemTableUnregister(table));
+ }
+
+ void CompactionComplete(TIntrusivePtr<NSharedCache::ISharedPageCacheMemTableRegistration> registration) override {
+ Send(new NSharedCache::TEvMemTableCompacted(std::move(registration)));
+ }
+
+private:
+ void Send(IEventBase* ev) {
+ ActorSystem->Send(new IEventHandle(SharedCacheId, Owner, ev));
+ }
+
+ TActorSystem* ActorSystem;
+ const TActorId Owner;
+ const TActorId SharedCacheId;
+};
+
TTableSnapshotContext::TTableSnapshotContext() = default;
TTableSnapshotContext::~TTableSnapshotContext() = default;
@@ -367,7 +398,8 @@ void TExecutor::Active(const TActorContext &ctx) {
CommitManager->Start(this, Owner->Tablet(), &Step0, Counters.Get());
- CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(Logger.Get(), Broker.Get(), this, loadedState->Comp,
+ auto sharedPageCacheMemTableObserver = MakeHolder<TSharedPageCacheMemTableObserver>(NActors::TActivationContext::ActorSystem(), SelfId());
+ CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(std::move(sharedPageCacheMemTableObserver), Logger.Get(), Broker.Get(), this, loadedState->Comp,
Sprintf("tablet-%" PRIu64, Owner->TabletID())));
LogicRedo->InstallCounters(Counters.Get(), nullptr);
@@ -3679,6 +3711,22 @@ bool TExecutor::CompactTables() {
}
}
+void TExecutor::Handle(NSharedCache::TEvMemTableRegistered::TPtr &ev) {
+ const auto *msg = ev->Get();
+
+ if (CompactionLogic) {
+ CompactionLogic->ProvideSharedPageCacheMemTableRegistration(msg->Table, std::move(msg->Registration));
+ }
+}
+
+void TExecutor::Handle(NSharedCache::TEvMemTableCompact::TPtr &ev) {
+ const auto *msg = ev->Get();
+
+ if (CompactionLogic) {
+ CompactionLogic->TriggerSharedPageCacheMemTableCompaction(msg->Table, msg->ExpectedSize);
+ }
+}
+
void TExecutor::AllowBorrowedGarbageCompaction(ui32 tableId) {
if (CompactionLogic) {
return CompactionLogic->AllowBorrowedGarbageCompaction(tableId);
@@ -3727,6 +3775,8 @@ STFUNC(TExecutor::StateWork) {
HFunc(NOps::TEvScanStat, Handle);
hFunc(NOps::TEvResult, Handle);
HFunc(NBlockIO::TEvStat, Handle);
+ hFunc(NSharedCache::TEvMemTableRegistered, Handle);
+ hFunc(NSharedCache::TEvMemTableCompact, Handle);
default:
break;
}
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index 7e842f8d33e..89064082997 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -645,6 +645,9 @@ public:
ui64 CompactTable(ui32 tableId) override;
bool CompactTables() override;
+ void Handle(NSharedCache::TEvMemTableRegistered::TPtr &ev);
+ void Handle(NSharedCache::TEvMemTableCompact::TPtr &ev);
+
void AllowBorrowedGarbageCompaction(ui32 tableId) override;
void FollowerAttached() override;
diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
index 7714664438e..407d460d1b3 100644
--- a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
+++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
@@ -17,12 +17,14 @@ TCompactionLogicState::TSnapRequest::~TSnapRequest()
TCompactionLogicState::TTableInfo::~TTableInfo()
{}
-TCompactionLogic::TCompactionLogic(NUtil::ILogger *logger,
+TCompactionLogic::TCompactionLogic(THolder<NSharedCache::ISharedPageCacheMemTableObserver> sharedPageCacheMemTableObserver,
+ NUtil::ILogger *logger,
NTable::IResourceBroker *broker,
NTable::ICompactionBackend *backend,
TAutoPtr<TCompactionLogicState> state,
TString taskNameSuffix)
- : Logger(logger)
+ : SharedPageCacheMemTableObserver(std::move(sharedPageCacheMemTableObserver))
+ , Logger(logger)
, Broker(broker)
, Backend(backend)
, Time(TAppData::TimeProvider.Get())
@@ -192,6 +194,31 @@ ui64 TCompactionLogic::PrepareForceCompaction(ui32 table, EForceCompaction mode)
return tableInfo->CurrentForcedMemCompactionId;
}
+void TCompactionLogic::TriggerSharedPageCacheMemTableCompaction(ui32 table, ui64 expectedSize) {
+ TCompactionLogicState::TTableInfo *tableInfo = State->Tables.FindPtr(table);
+ if (!tableInfo)
+ return;
+
+ auto &inMem = tableInfo->InMem;
+ auto &policy = tableInfo->Policy->BackgroundSnapshotPolicy;
+
+ if (inMem.State == ECompactionState::Free) {
+ if (inMem.EstimatedSize >= expectedSize) {
+ auto priority = tableInfo->ComputeBackgroundPriority(inMem.CompactionTask, policy, 100, Time->Now());
+ SubmitCompactionTask(table, 0, policy.ResourceBrokerTask,
+ priority, inMem.CompactionTask);
+ inMem.State = ECompactionState::PendingBackground;
+ } else {
+ // there was a race and we finished some compaction while our message was waiting
+ // so let's notify that we completed it
+ Y_VERIFY_DEBUG(tableInfo->SharedPageCacheMemTableRegistration);
+ if (auto& registration = tableInfo->SharedPageCacheMemTableRegistration) {
+ SharedPageCacheMemTableObserver->CompactionComplete(registration);
+ }
+ }
+ }
+}
+
TFinishedCompactionInfo TCompactionLogic::GetFinishedCompactionInfo(ui32 table) {
TCompactionLogicState::TTableInfo *tableInfo = State->Tables.FindPtr(table);
if (!tableInfo || !tableInfo->Strategy)
@@ -259,6 +286,10 @@ TReflectSchemeChangesResult TCompactionLogic::ReflectSchemeChanges()
if (table.AllowBorrowedGarbageCompaction) {
table.Strategy->AllowBorrowedGarbageCompaction();
}
+
+ if (!table.SharedPageCacheMemTableRegistration) {
+ SharedPageCacheMemTableObserver->Register(info.Id);
+ }
} else {
Y_VERIFY(table.Strategy);
table.Strategy->ReflectSchema();
@@ -277,6 +308,17 @@ TReflectSchemeChangesResult TCompactionLogic::ReflectSchemeChanges()
return result;
}
+void TCompactionLogic::ProvideSharedPageCacheMemTableRegistration(ui32 table, TIntrusivePtr<NSharedCache::ISharedPageCacheMemTableRegistration> registration)
+{
+ auto *tableInfo = State->Tables.FindPtr(table);
+ if (tableInfo) {
+ registration->SetConsumption(tableInfo->InMem.EstimatedSize);
+
+ Y_VERIFY_DEBUG(!tableInfo->SharedPageCacheMemTableRegistration);
+ tableInfo->SharedPageCacheMemTableRegistration = std::move(registration);
+ }
+}
+
void TCompactionLogic::ReflectRemovedRowVersions(ui32 table)
{
auto *tableInfo = State->Tables.FindPtr(table);
@@ -305,6 +347,9 @@ THolder<NTable::ICompactionStrategy> TCompactionLogic::CreateStrategy(
void TCompactionLogic::StopTable(TCompactionLogicState::TTableInfo &table)
{
+ // Note: should be called even without table.SharedPageCacheMemTableRegistration
+ SharedPageCacheMemTableObserver->Unregister(table.TableId);
+
if (table.Strategy) {
// Strategy will cancel all pending and running compactions
table.Strategy->Stop();
@@ -389,6 +434,13 @@ void TCompactionLogic::UpdateInMemStatsStep(ui32 table, ui32 steps, ui64 size) {
mem.EstimatedSize = size;
mem.Steps += steps;
+ if (auto& registration = info->SharedPageCacheMemTableRegistration) {
+ registration->SetConsumption(size);
+ if (steps == 0) {
+ SharedPageCacheMemTableObserver->CompactionComplete(registration);
+ }
+ }
+
CheckInMemStats(table);
}
diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.h b/ydb/core/tablet_flat/flat_executor_compaction_logic.h
index 35a903af31b..a476d766f77 100644
--- a/ydb/core/tablet_flat/flat_executor_compaction_logic.h
+++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.h
@@ -5,6 +5,7 @@
#include "flat_store_bundle.h"
#include "flat_exec_broker.h"
#include "logic_redo_eggs.h"
+#include "shared_cache_memtable.h"
#include "util_fmt_line.h"
#include <ydb/core/base/localdb.h>
#include <library/cpp/time_provider/time_provider.h>
@@ -85,6 +86,8 @@ struct TCompactionLogicState {
THolder<NTable::ICompactionStrategy> Strategy;
+ TIntrusivePtr<NSharedCache::ISharedPageCacheMemTableRegistration> SharedPageCacheMemTableRegistration;
+
TDeque<TSnapRequest> SnapRequests;
TIntrusiveConstPtr<TCompactionPolicy> Policy;
@@ -156,6 +159,7 @@ struct TReflectSchemeChangesResult {
};
class TCompactionLogic {
+ THolder<NSharedCache::ISharedPageCacheMemTableObserver> SharedPageCacheMemTableObserver;
NUtil::ILogger * const Logger;
NTable::IResourceBroker * const Broker;
NTable::ICompactionBackend * const Backend;
@@ -189,6 +193,7 @@ public:
static constexpr ui32 BAD_PRIORITY = Max<ui32>();
TCompactionLogic(
+ THolder<NSharedCache::ISharedPageCacheMemTableObserver> sharedPageCacheMemTableObserver,
NUtil::ILogger*,
NTable::IResourceBroker*,
NTable::ICompactionBackend*,
@@ -217,11 +222,14 @@ public:
bool PrepareForceCompaction();
ui64 PrepareForceCompaction(ui32 table, EForceCompaction mode = EForceCompaction::Full);
+ void TriggerSharedPageCacheMemTableCompaction(ui32 table, ui64 expectedSize);
+
TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 table);
void AllowBorrowedGarbageCompaction(ui32 table);
TReflectSchemeChangesResult ReflectSchemeChanges();
+ void ProvideSharedPageCacheMemTableRegistration(ui32 table, TIntrusivePtr<NSharedCache::ISharedPageCacheMemTableRegistration> registration);
void ReflectRemovedRowVersions(ui32 table);
void UpdateInMemStatsStep(ui32 table, ui32 steps, ui64 size);
void CheckInMemStats(ui32 table);
diff --git a/ydb/core/tablet_flat/shared_cache_events.h b/ydb/core/tablet_flat/shared_cache_events.h
index a7abe542efc..24bac1a291c 100644
--- a/ydb/core/tablet_flat/shared_cache_events.h
+++ b/ydb/core/tablet_flat/shared_cache_events.h
@@ -3,6 +3,7 @@
#include "defs.h"
#include "flat_bio_events.h"
#include "shared_handle.h"
+#include "shared_cache_memtable.h"
#include <util/generic/map.h>
#include <util/generic/set.h>
@@ -27,6 +28,11 @@ namespace NSharedCache {
EvResult,
EvUpdated,
EvMem,
+ EvMemTableRegister,
+ EvMemTableRegistered,
+ EvMemTableCompact,
+ EvMemTableCompacted,
+ EvMemTableUnregister,
EvEnd
@@ -134,6 +140,49 @@ namespace NSharedCache {
struct TEvMem : public TEventLocal<TEvMem, EvMem> {
};
+ struct TEvMemTableRegister : public TEventLocal<TEvMemTableRegister, EvMemTableRegister> {
+ const ui32 Table;
+
+ TEvMemTableRegister(ui32 table)
+ : Table(table)
+ {}
+ };
+
+ struct TEvMemTableRegistered : public TEventLocal<TEvMemTableRegistered, EvMemTableRegistered> {
+ const ui32 Table;
+ TIntrusivePtr<ISharedPageCacheMemTableRegistration> Registration;
+
+ TEvMemTableRegistered(ui32 table, TIntrusivePtr<ISharedPageCacheMemTableRegistration> registration)
+ : Table(table)
+ , Registration(std::move(registration))
+ {}
+ };
+
+ struct TEvMemTableCompact : public TEventLocal<TEvMemTableCompact, EvMemTableCompact> {
+ const ui32 Table;
+ const ui64 ExpectedSize;
+
+ TEvMemTableCompact(ui32 table, ui64 expectedSize)
+ : Table(table)
+ , ExpectedSize(expectedSize)
+ {}
+ };
+
+ struct TEvMemTableCompacted : public TEventLocal<TEvMemTableCompacted, EvMemTableCompacted> {
+ const TIntrusivePtr<ISharedPageCacheMemTableRegistration> Registration;
+
+ TEvMemTableCompacted(TIntrusivePtr<ISharedPageCacheMemTableRegistration> registration)
+ : Registration(registration)
+ {}
+ };
+
+ struct TEvMemTableUnregister : public TEventLocal<TEvMemTableUnregister, EvMemTableUnregister> {
+ const ui32 Table;
+
+ TEvMemTableUnregister(ui32 table)
+ : Table(table)
+ {}
+ };
}
}
diff --git a/ydb/core/tablet_flat/shared_cache_memtable.h b/ydb/core/tablet_flat/shared_cache_memtable.h
new file mode 100644
index 00000000000..e78682bed92
--- /dev/null
+++ b/ydb/core/tablet_flat/shared_cache_memtable.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr {
+namespace NSharedCache {
+
+class ISharedPageCacheMemTableRegistration : TNonCopyable, public TThrRefBase {
+public:
+ virtual void SetConsumption(ui64 newConsumption) = 0;
+ virtual ~ISharedPageCacheMemTableRegistration() = default;
+};
+
+class ISharedPageCacheMemTableObserver : TNonCopyable {
+public:
+ virtual void Register(ui32 table) = 0;
+ virtual void Unregister(ui32 table) = 0;
+ virtual void CompactionComplete(TIntrusivePtr<ISharedPageCacheMemTableRegistration> registration) = 0;
+ virtual ~ISharedPageCacheMemTableObserver() = default;
+};
+
+}
+}
diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp
index 40d86df5fdc..a35e80af211 100644
--- a/ydb/core/tablet_flat/shared_sausagecache.cpp
+++ b/ydb/core/tablet_flat/shared_sausagecache.cpp
@@ -27,6 +27,8 @@ TSharedPageCacheCounters::TSharedPageCacheCounters(const TIntrusivePtr<::NMonito
, CacheMissBytes(group->GetCounter("CacheMissBytes", true))
, LoadInFlyPages(group->GetCounter("LoadInFlyPages"))
, LoadInFlyBytes(group->GetCounter("LoadInFlyBytes"))
+ , MemTableTotalBytes(group->GetCounter("MemTableTotalBytes"))
+ , MemTableCompactingBytes(group->GetCounter("MemTableCompactingBytes"))
{ }
}
@@ -41,6 +43,147 @@ static bool Satisfies(NLog::EPriority priority = NLog::PRI_DEBUG) {
return false;
}
+class TSharedPageCacheMemTableTracker;
+
+class TSharedPageCacheMemTableRegistration : public NSharedCache::ISharedPageCacheMemTableRegistration {
+public:
+ TSharedPageCacheMemTableRegistration(TActorId owner, ui32 table, std::weak_ptr<TSharedPageCacheMemTableTracker> tracker)
+ : Tracker(std::move(tracker))
+ , Owner(owner)
+ , Table(table)
+ {}
+
+ ui64 GetConsumption() const {
+ return Consumption.load();
+ }
+
+ void SetConsumption(ui64 newConsumption);
+
+private:
+ std::weak_ptr<TSharedPageCacheMemTableTracker> Tracker;
+ std::atomic<ui64> Consumption = 0;
+
+public:
+ const TActorId Owner;
+ const ui32 Table;
+};
+
+class TSharedPageCacheMemTableTracker : public std::enable_shared_from_this<TSharedPageCacheMemTableTracker> {
+ friend TSharedPageCacheMemTableRegistration;
+
+public:
+ TSharedPageCacheMemTableTracker(TIntrusivePtr<TSharedPageCacheCounters> counters)
+ : Counters(counters)
+ {}
+
+ ui64 GetTotalConsumption() {
+ return TotalConsumption.load();
+ }
+
+ ui64 GetTotalCompacting() {
+ return TotalCompacting;
+ }
+
+ TIntrusivePtr<TSharedPageCacheMemTableRegistration> Register(TActorId owner, ui32 table) {
+ auto &ret = Registrations[{owner, table}];
+ if (!ret) {
+ ret = MakeIntrusive<TSharedPageCacheMemTableRegistration>(owner, table, weak_from_this());
+ NonCompacting.insert(ret);
+ }
+ return ret;
+ }
+
+ void Unregister(TActorId owner, ui32 table) {
+ auto it = Registrations.find({owner, table});
+ if (it != Registrations.end()) {
+ auto& registration = it->second;
+ CompactionComplete(registration);
+ registration->SetConsumption(0);
+ Y_VERIFY_DEBUG(NonCompacting.contains(registration));
+ NonCompacting.erase(registration);
+ Registrations.erase(it);
+ }
+ }
+
+ void CompactionComplete(TIntrusivePtr<TSharedPageCacheMemTableRegistration> registration) {
+ auto it = Compacting.find(registration);
+ if (it != Compacting.end()) {
+ ChangeTotalCompacting(-it->second);
+ NonCompacting.insert(it->first);
+ Compacting.erase(it);
+ }
+ }
+
+ /**
+ * @return registrations and sizes that should be compacted
+ */
+ TVector<std::pair<TIntrusivePtr<TSharedPageCacheMemTableRegistration>, ui64>> SelectForCompaction(ui64 toCompact) {
+ TVector<std::pair<TIntrusivePtr<TSharedPageCacheMemTableRegistration>, ui64>> ret;
+
+ if (toCompact <= TotalCompacting) {
+ // nothing to compact more
+ return ret;
+ }
+
+ for (const auto &r : NonCompacting) {
+ ui64 consumption = r->GetConsumption();
+ if (consumption) {
+ ret.emplace_back(r, consumption);
+ }
+ }
+
+ Sort(ret, [](const auto &x, const auto &y) { return x.second > y.second; });
+
+ size_t take = 0;
+ for (auto it = ret.begin(); it != ret.end() && toCompact > TotalCompacting; it++) {
+ auto reg = it->first;
+
+ Compacting[reg] = it->second;
+ Y_VERIFY(NonCompacting.erase(reg));
+
+ ChangeTotalCompacting(it->second);
+
+ take++;
+ }
+
+ ret.resize(take);
+ return ret;
+ }
+
+private:
+ void ChangeTotalConsumption(ui64 delta) {
+ TotalConsumption.fetch_add(delta);
+ if (Counters) {
+ Counters->MemTableTotalBytes->Add(delta);
+ }
+ }
+
+ void ChangeTotalCompacting(ui64 delta) {
+ TotalCompacting += delta;
+ if (Counters) {
+ Counters->MemTableCompactingBytes->Add(delta);
+ }
+ }
+
+private:
+ TIntrusivePtr<TSharedPageCacheCounters> Counters;
+ TMap<std::pair<TActorId, ui32>, TIntrusivePtr<TSharedPageCacheMemTableRegistration>> Registrations;
+ THashSet<TIntrusivePtr<TSharedPageCacheMemTableRegistration>> NonCompacting;
+ THashMap<TIntrusivePtr<TSharedPageCacheMemTableRegistration>, ui64> Compacting;
+ std::atomic<ui64> TotalConsumption = 0;
+ // Approximate value, updates only on compaction start/stop.
+ //
+ // Counts only Shared Cache triggered compactions.
+ ui64 TotalCompacting = 0;
+};
+
+void TSharedPageCacheMemTableRegistration::SetConsumption(ui64 newConsumption) {
+ ui64 before = Consumption.exchange(newConsumption);
+ if (auto t = Tracker.lock()) {
+ t->ChangeTotalConsumption(newConsumption - before);
+ }
+}
+
class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
using ELnLev = NUtil::ELnLev;
using TBlocks = TVector<NSharedCache::TEvResult::TLoaded>;
@@ -165,6 +308,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
THashMap<TLogoBlobID, TCollection> Collections;
THashMap<TActorId, TCollectionsOwner> CollectionsOwners;
TIntrusivePtr<TMemObserver> MemObserver;
+ std::shared_ptr<TSharedPageCacheMemTableTracker> MemTableTracker;
TRequestQueue AsyncRequests;
TRequestQueue ScanRequests;
@@ -258,6 +402,37 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
ActualizeCacheSizeLimit();
DoGC();
+
+ if (MemLimitBytes && MemLimitBytes < ConfigLimitBytes) {
+ // in normal scenario we expect that we can fill the whole shared cache
+ ui64 memTableReservedBytes = ConfigLimitBytes * Config->MemTableReservationPercent / 100;
+ ui64 memTableTotal = MemTableTracker->GetTotalConsumption();
+ if (memTableTotal > memTableReservedBytes) {
+ ui64 toCompact = Min(ConfigLimitBytes - MemLimitBytes, memTableTotal - memTableReservedBytes);
+ auto registrations = MemTableTracker->SelectForCompaction(toCompact);
+ for (auto registration : registrations) {
+ Send(registration.first->Owner, new NSharedCache::TEvMemTableCompact(registration.first->Table, registration.second));
+ }
+ }
+ }
+ }
+
+ void Handle(NSharedCache::TEvMemTableRegister::TPtr &ev) {
+ const auto *msg = ev->Get();
+ auto registration = MemTableTracker->Register(ev->Sender, msg->Table);
+ Send(ev->Sender, new NSharedCache::TEvMemTableRegistered(msg->Table, std::move(registration)));
+ }
+
+ void Handle(NSharedCache::TEvMemTableUnregister::TPtr &ev) {
+ const auto *msg = ev->Get();
+ MemTableTracker->Unregister(ev->Sender, msg->Table);
+ }
+
+ void Handle(NSharedCache::TEvMemTableCompacted::TPtr &ev) {
+ const auto *msg = ev->Get();
+ if (auto registration = dynamic_cast<TSharedPageCacheMemTableRegistration*>(msg->Registration.Get())) {
+ MemTableTracker->CompactionComplete(registration);
+ }
}
void Registered(TActorSystem *sys, const TActorId &owner)
@@ -1044,6 +1219,9 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
if (msg->Record.HasActivePagesReservationPercent()) {
Config->ActivePagesReservationPercent = msg->Record.GetActivePagesReservationPercent();
}
+ if (msg->Record.HasMemTableReservationPercent()) {
+ Config->MemTableReservationPercent = msg->Record.GetMemTableReservationPercent();
+ }
if (msg->Record.GetAsyncQueueInFlyLimit() != 0) {
AsyncRequests.Limit = msg->Record.GetAsyncQueueInFlyLimit();
@@ -1114,6 +1292,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
public:
TSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver)
: MemObserver(std::move(memObserver))
+ , MemTableTracker(std::make_shared<TSharedPageCacheMemTableTracker>(config->Counters))
, Config(std::move(config))
, Cache(*Config->CacheConfig)
, SizeOverride(Config->CacheConfig->Limit, 1, Max<i64>())
@@ -1139,6 +1318,10 @@ public:
hFunc(NSharedCache::TEvUnregister, Handle);
hFunc(NSharedCache::TEvInvalidate, Handle);
hFunc(NSharedCache::TEvMem, Handle);
+ hFunc(NSharedCache::TEvMemTableRegister, Handle);
+ hFunc(NSharedCache::TEvMemTableUnregister, Handle);
+ hFunc(NSharedCache::TEvMemTableCompacted, Handle);
+
hFunc(NBlockIO::TEvData, Handle);
hFunc(TEvSharedPageCache::TEvConfigure, Handle);
cFunc(TEvents::TSystem::Wakeup, GCWakeup);
diff --git a/ydb/core/tablet_flat/shared_sausagecache.h b/ydb/core/tablet_flat/shared_sausagecache.h
index 28ae0825b4b..ce8245f07af 100644
--- a/ydb/core/tablet_flat/shared_sausagecache.h
+++ b/ydb/core/tablet_flat/shared_sausagecache.h
@@ -38,6 +38,8 @@ struct TSharedPageCacheCounters final : public TAtomicRefCount<TSharedPageCacheC
const TCounterPtr CacheMissBytes;
const TCounterPtr LoadInFlyPages;
const TCounterPtr LoadInFlyBytes;
+ const TCounterPtr MemTableTotalBytes;
+ const TCounterPtr MemTableCompactingBytes;
explicit TSharedPageCacheCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters> &group);
};
@@ -49,6 +51,7 @@ struct TSharedPageCacheConfig {
TString CacheName = "SharedPageCache";
TIntrusivePtr<TSharedPageCacheCounters> Counters;
ui32 ActivePagesReservationPercent = 50;
+ ui32 MemTableReservationPercent = 20;
};
IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver);
diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h
index 5361c1f0c29..44557d41d1e 100644
--- a/ydb/core/tablet_flat/test/libs/exec/runner.h
+++ b/ydb/core/tablet_flat/test/libs/exec/runner.h
@@ -185,7 +185,7 @@ namespace NFake {
config->TotalScanQueueInFlyLimit = conf.ScanQueue;
config->Counters = MakeIntrusive<TSharedPageCacheCounters>(Env.GetDynamicCounters());
- auto *actor = CreateSharedPageCache(std::move(config), Env.GetMemObserver());
+ auto *actor = CreateSharedPageCache(std::move(config), Env.GetMemObserver());
RunOn(3, MakeSharedPageCacheId(0), actor, EMail::ReadAsFilled);
}
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
index f5e5f0917de..ee2423e7d48 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
@@ -72,6 +72,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
index 4723dfaccdf..24d732f728a 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
@@ -75,6 +75,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
index 233cb18aa6b..449b23320ce 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
@@ -76,6 +76,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
index b8a23593306..2b3e5a8d7f1 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
@@ -65,6 +65,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
index 26bdf4f71ee..00944cfb156 100644
--- a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
+++ b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
@@ -89,16 +89,20 @@ struct TTxReadRow : public ITransaction {
};
void LogCounters(TIntrusivePtr<TSharedPageCacheCounters> counters) {
- Cerr << "Counters: Active:" << counters->ActiveBytes->Val() << "/" << counters->ActiveLimitBytes->Val() << ", Passive:" << counters->PassiveBytes->Val() << ", MemLimit:" << counters->MemLimitBytes->Val() << Endl;
+ Cerr << "Counters: Active:" << counters->ActiveBytes->Val() << "/" << counters->ActiveLimitBytes->Val()
+ << ", Passive:" << counters->PassiveBytes->Val()
+ << ", MemLimit:" << counters->MemLimitBytes->Val()
+ << ", MemTable:" << counters->MemTableCompactingBytes->Val() << "/" << counters->MemTableTotalBytes->Val()
+ << Endl;
}
-void WaitMemEvent(TMyEnvBase& env) {
+void WaitEvent(TMyEnvBase& env, ui32 eventType, ui32 requiredCount = 1) {
TDispatchOptions options;
- options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(NSharedCache::EvMem, 1));
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(eventType, requiredCount));
env->DispatchEvents(options);
}
-Y_UNIT_TEST(Limits) {
+Y_UNIT_TEST(PageCacheLimits) {
TMyEnvBase env;
auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters());
@@ -121,38 +125,38 @@ Y_UNIT_TEST(Limits) {
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(counters->LoadInFlyBytes->Val(), 0);
- UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB / 2); // 2 full layers (fresh & staging)
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB/ 3); // 2 full layers (fresh & staging)
UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB);
UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 0);
env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB});
- WaitMemEvent(env);
+ WaitEvent(env, NSharedCache::EvMem);
LogCounters(counters);
- UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB / 2);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB/ 3);
UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB);
UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 5*MB + counters->ActiveBytes->Val() + counters->PassiveBytes->Val());
env->GetMemObserver()->NotifyStat({101*MB, 110*MB, 100*MB});
- WaitMemEvent(env);
+ WaitEvent(env, NSharedCache::EvMem);
LogCounters(counters);
- UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2); // 1mb evicted
- UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB/ 3); // 1mb evicted
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB / 3 * 2 - MB, MB/ 3);
UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), counters->ActiveLimitBytes->Val());
env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB});
- WaitMemEvent(env);
+ WaitEvent(env, NSharedCache::EvMem);
LogCounters(counters);
- UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB/ 3);
UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB);
UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772);
UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 5*MB + counters->ActiveBytes->Val() + counters->PassiveBytes->Val()); // +5mb
env->GetMemObserver()->NotifyStat({200*MB, 110*MB, 100*MB});
- WaitMemEvent(env);
+ WaitEvent(env, NSharedCache::EvMem);
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(counters->ActiveBytes->Val(), 0);
UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 1); // zero limit
@@ -160,6 +164,105 @@ Y_UNIT_TEST(Limits) {
UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 1); // zero limit
}
+Y_UNIT_TEST(MemTableRegistration) {
+ TMyEnvBase env;
+
+ // start:
+
+ env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
+ env.SendAsync(new NFake::TEvExecute{ new TTxInitSchema() });
+ env.SendAsync(new NFake::TEvExecute{ new TTxWriteRow(1, "asdf") });
+
+ WaitEvent(env, NSharedCache::EvMemTableRegistered);
+
+ // stop:
+
+ env.SendAsync(new TEvents::TEvPoison);
+
+ WaitEvent(env, NSharedCache::EvUnregister);
+
+ // restart:
+
+ env.FireTablet(env.Edge, env.Tablet, [&](const TActorId &tablet, TTabletStorageInfo *info) {
+ return new NFake::TDummy(tablet, info, env.Edge, 0);
+ });
+
+ WaitEvent(env, NSharedCache::EvMemTableRegistered);
+}
+
+Y_UNIT_TEST(MemTableLimits) {
+ TMyEnvBase env;
+
+ env->SetLogPriority(NKikimrServices::OPS_COMPACT, NActors::NLog::PRI_INFO);
+
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters());
+
+ TVector<ui64> tabletIds;
+ for (size_t tablet = 0; tablet < 10; tablet++) {
+ tabletIds.push_back(env.Tablet);
+ env.Tablet++;
+ }
+
+ for (auto tabletId : tabletIds) {
+ env.Tablet = tabletId;
+
+ env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
+ env.SendSync(new NFake::TEvExecute{ new TTxInitSchema() });
+
+ // write 10 rows, each ~100KB
+ for (i64 key = 0; key < 10; ++key) {
+ TString value(size_t(100 * 1024), char('a' + key % 26));
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) });
+ }
+
+ Cerr << "...compacting" << Endl;
+ env.SendSync(new NFake::TEvCompact(TableId));
+ Cerr << "...waiting until compacted" << Endl;
+ env.WaitFor<NFake::TEvCompacted>();
+
+ for (i64 key = 0; key < 10; ++key) {
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key) });
+ }
+
+ // write 10 rows, each ~50KB
+ for (i64 key = 100; key < 110; ++key) {
+ TString value(size_t(50 * 1024), char('a' + key % 26));
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) });
+ }
+ }
+
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->MemTableTotalBytes->Val(), 5*MB, MB/ 3);
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemTableCompactingBytes->Val(), 0);
+
+ env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB});
+ WaitEvent(env, NSharedCache::EvMem);
+ LogCounters(counters);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB/ 3);
+ UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB);
+ UNIT_ASSERT_GE(counters->MemLimitBytes->Val(), 8*MB); // bigger than config value
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->MemTableTotalBytes->Val(), 5*MB, MB/ 3);
+
+ env->GetMemObserver()->NotifyStat({100*MB + 1, 110*MB, 100*MB});
+ WaitEvent(env, NSharedCache::EvMem);
+ LogCounters(counters);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB / 3 * 2, MB/ 3); // 1 page evicted
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->MemLimitBytes->Val(), 8*MB / 3 * 2, MB/ 3);
+
+ // we want to get ~2.7MB back, it's 6 Mem Tables
+ env.WaitFor<NFake::TEvCompacted>(6);
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->MemTableTotalBytes->Val(), 2*MB, MB/ 3);
+
+ env->GetMemObserver()->NotifyStat({200*MB, 110*MB, 100*MB});
+ WaitEvent(env, NSharedCache::EvMem);
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 1); // zero limit
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 1); // zero limit
+
+ // compacted all except reservation, it's all except 20% of 8MB = 1.6MB = 3 Mem Tables
+ env.WaitFor<NFake::TEvCompacted>(1); // so 1 more Mem Table compacted
+ UNIT_ASSERT_DOUBLES_EQUAL(counters->MemTableTotalBytes->Val(), 1.5*MB, MB/ 3);
+}
+
} // Y_UNIT_TEST_SUITE(TSharedPageCache)
} // namespace NTabletFlatExecutor
diff --git a/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp b/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp
new file mode 100644
index 00000000000..5f10d679622
--- /dev/null
+++ b/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp
@@ -0,0 +1,374 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include "shared_sausagecache.cpp"
+
+namespace NKikimr {
+namespace NTabletFlatExecutor {
+Y_UNIT_TEST_SUITE(TSharedPageCacheMemTableTracker) {
+
+#define VERIFY_TOTAL(expected) \
+do { \
+ UNIT_ASSERT_VALUES_EQUAL(tracker->GetTotalConsumption(), expected); \
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemTableTotalBytes->Val(), i64(expected)); \
+} while (false)
+
+#define VERIFY_COMPACTING(expected) \
+do { \
+ UNIT_ASSERT_VALUES_EQUAL(tracker->GetTotalCompacting(), expected); \
+ UNIT_ASSERT_VALUES_EQUAL(counters->MemTableCompactingBytes->Val(), i64(expected)); \
+} while (false)
+
+Y_UNIT_TEST(Empty) {
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>());
+ auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ UNIT_ASSERT(tracker->SelectForCompaction(100).empty());
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+}
+
+Y_UNIT_TEST(Destruction) {
+ std::weak_ptr<TSharedPageCacheMemTableTracker> wTracker;
+
+ {
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>());
+ auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters);
+ wTracker = tracker;
+
+ auto registration = tracker->Register(TActorId{1, 10}, 100);
+ registration->SetConsumption(1000);
+
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+ }
+
+ UNIT_ASSERT(!wTracker.lock());
+}
+
+Y_UNIT_TEST(Register) {
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>());
+ auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters);
+
+ { // non-compacting registration
+ auto registration = tracker->Register(TActorId{1, 10}, 100);
+ registration->SetConsumption(1000);
+
+ // weak pointer doesn't use
+ UNIT_ASSERT_VALUES_EQUAL(tracker.use_count(), 1);
+
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+
+ UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 3);
+ tracker->Unregister(TActorId{1, 10}, 100);
+ UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 1);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ registration.Reset();
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+ }
+
+ { // compacting registration
+ auto registration = tracker->Register(TActorId{1, 10}, 100);
+ registration->SetConsumption(1000);
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1).size(), 1);
+
+ // weak pointer doesn't use
+ UNIT_ASSERT_VALUES_EQUAL(tracker.use_count(), 1);
+
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(1000);
+
+ UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 3);
+ tracker->Unregister(TActorId{1, 10}, 100);
+ UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 1);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ registration.Reset();
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+ }
+
+ { // duplicate registration
+ auto registration1 = tracker->Register(TActorId{1, 10}, 100);
+ registration1->SetConsumption(1000);
+
+ auto registration2 = tracker->Register(TActorId{1, 10}, 100);
+ UNIT_ASSERT_VALUES_EQUAL(registration1, registration2);
+
+ // weak pointer doesn't use
+ UNIT_ASSERT_VALUES_EQUAL(tracker.use_count(), 1);
+
+ registration2->SetConsumption(2000);
+ VERIFY_TOTAL(2000);
+ VERIFY_COMPACTING(0);
+
+ UNIT_ASSERT_VALUES_EQUAL(registration2->RefCount(), 4);
+ tracker->Unregister(TActorId{1, 10}, 100);
+ UNIT_ASSERT_VALUES_EQUAL(registration2->RefCount(), 2);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ registration1.Reset();
+ registration2.Reset();
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+ }
+}
+
+Y_UNIT_TEST(Unregister) {
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>());
+ auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters);
+
+ { // double unregister
+ auto registration = tracker->Register(TActorId{1, 10}, 100);
+ registration->SetConsumption(1000);
+
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+
+ tracker->Unregister(TActorId{1, 10}, 100);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ tracker->Unregister(TActorId{1, 10}, 100);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 1);
+ }
+
+ { // unregister and register back
+ auto registration1 = tracker->Register(TActorId{1, 10}, 100);
+ registration1->SetConsumption(1000);
+
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+
+ tracker->Unregister(TActorId{1, 10}, 100);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ auto registration2 = tracker->Register(TActorId{1, 10}, 100);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ registration2->SetConsumption(2000);
+
+ VERIFY_TOTAL(2000);
+ VERIFY_COMPACTING(0);
+ }
+
+ { // someone else unregister
+ auto registration = tracker->Register(TActorId{1, 10}, 100);
+ registration->SetConsumption(1000);
+
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+
+ tracker->Unregister(TActorId{1, 10}, 200);
+
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+
+ UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 3);
+
+ tracker->Unregister(TActorId{1, 10}, 100);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+
+ UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 1);
+ }
+}
+
+Y_UNIT_TEST(SetConsumption) {
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>());
+ auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters);
+
+ { // non-compacting updates
+ auto registration = tracker->Register(TActorId{1, 10}, 100);
+
+ registration->SetConsumption(1000);
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+
+ registration->SetConsumption(2000);
+ VERIFY_TOTAL(2000);
+ VERIFY_COMPACTING(0);
+
+ registration->SetConsumption(999);
+ VERIFY_TOTAL(999);
+ VERIFY_COMPACTING(0);
+
+ tracker->Unregister(TActorId{1, 10}, 100);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+ }
+
+ { // compacting updates
+ auto registration = tracker->Register(TActorId{1, 10}, 100);
+
+ registration->SetConsumption(1000);
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1).size(), 1);
+
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(1000);
+
+ registration->SetConsumption(2000);
+ VERIFY_TOTAL(2000);
+ VERIFY_COMPACTING(1000);
+
+ registration->SetConsumption(999);
+ VERIFY_TOTAL(999);
+ VERIFY_COMPACTING(1000);
+
+ tracker->Unregister(TActorId{1, 10}, 100);
+
+ VERIFY_TOTAL(0);
+ VERIFY_COMPACTING(0);
+ }
+}
+
+Y_UNIT_TEST(CompactionComplete) {
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>());
+ auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters);
+
+ {
+ auto registration = tracker->Register(TActorId{1, 10}, 100);
+
+ registration->SetConsumption(1000);
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1).size(), 1);
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(1000);
+
+ tracker->CompactionComplete(registration);
+ VERIFY_TOTAL(1000);
+ VERIFY_COMPACTING(0);
+ }
+}
+
+Y_UNIT_TEST(SelectForCompaction) {
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>());
+ auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters);
+
+ TVector<TIntrusivePtr<TSharedPageCacheMemTableRegistration>> registrations;
+ for (size_t i = 0; i < 3; i++) {
+ registrations.push_back(tracker->Register(TActorId{1, 10}, 100 + i));
+ }
+
+ auto reset = [&]() {
+ registrations[0]->SetConsumption(3);
+ registrations[1]->SetConsumption(100);
+ registrations[2]->SetConsumption(20);
+
+ for (size_t i = 0; i < 3; i++) {
+ tracker->CompactionComplete(registrations[i]);
+ }
+
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(0);
+ };
+
+ TVector<std::pair<TIntrusivePtr<TSharedPageCacheMemTableRegistration>, ui64>> expected;
+
+ reset();
+ expected = {{registrations[1], 100}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(100);
+
+ reset();
+ expected = {{registrations[1], 100}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(100), expected);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(100);
+
+ reset();
+ expected = {{registrations[1], 100}, {registrations[2], 20}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(101), expected);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(120);
+
+ reset();
+ expected = {{registrations[1], 100}, {registrations[2], 20}, {registrations[0], 3}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(123), expected);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(123);
+
+ reset();
+ expected = {{registrations[1], 100}, {registrations[2], 20}, {registrations[0], 3}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(999), expected);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(123);
+
+ reset();
+ expected = {{registrations[1], 100}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected);
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(100).size(), 0); // compacts max, not sum
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(100);
+
+ reset();
+ expected = {{registrations[1], 100}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected);
+ expected = {{registrations[2], 20}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(101), expected);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(120);
+
+ reset();
+ expected = {{registrations[1], 100}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(100);
+ tracker->CompactionComplete(registrations[1]);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(0);
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected);
+ VERIFY_TOTAL(123);
+ VERIFY_COMPACTING(100);
+
+ reset();
+ expected = {{registrations[1], 100}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected);
+ registrations[1]->SetConsumption(1000);
+ VERIFY_TOTAL(1023);
+ VERIFY_COMPACTING(100); // didn't update
+
+ reset();
+ registrations[2]->SetConsumption(0);
+ expected = {{registrations[1], 100}, {registrations[0], 3}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(999), expected);
+ VERIFY_TOTAL(103);
+ VERIFY_COMPACTING(103);
+
+ reset();
+ tracker->Unregister(TActorId{1, 10}, 101);
+ expected = {{registrations[2], 20}, {registrations[0], 3}};
+ UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(100), expected);
+ VERIFY_TOTAL(23);
+ VERIFY_COMPACTING(23);
+ registrations[1] = tracker->Register(TActorId{1, 10}, 101);
+}
+
+}
+}
+}
diff --git a/ydb/core/tablet_flat/ut/ya.make b/ydb/core/tablet_flat/ut/ya.make
index aeae789e3a4..b26924453a6 100644
--- a/ydb/core/tablet_flat/ut/ya.make
+++ b/ydb/core/tablet_flat/ut/ya.make
@@ -48,6 +48,7 @@ SRCS(
ut_screen.cpp
ut_bloom.cpp
ut_shared_sausagecache.cpp
+ ut_shared_sausagecache_memtable.cpp
ut_slice.cpp
ut_slice_loader.cpp
ut_versions.cpp
diff --git a/ydb/core/tablet_flat/ya.make b/ydb/core/tablet_flat/ya.make
index ad1c4fcfe08..4af18689526 100644
--- a/ydb/core/tablet_flat/ya.make
+++ b/ydb/core/tablet_flat/ya.make
@@ -66,6 +66,7 @@ SRCS(
shared_handle.cpp
shared_sausagecache.cpp
shared_sausagecache.h
+ shared_cache_memtable.h
tablet_flat_executor.h
tablet_flat_executor.cpp
tablet_flat_executed.h