diff options
author | kungasc <kungasc@yandex-team.com> | 2023-07-26 13:26:24 +0300 |
---|---|---|
committer | kungasc <kungasc@yandex-team.com> | 2023-07-26 13:26:24 +0300 |
commit | 2c2762d53cba845d7628d73b4a5077e026816f8b (patch) | |
tree | 344322e96a280f24499339d3e54f5eaf73554170 | |
parent | 2ccacfcb71600c3a39b95a255753c0335dbfc95d (diff) | |
download | ydb-2c2762d53cba845d7628d73b4a5077e026816f8b.tar.gz |
KIKIMR-18716 Limit MemTable total consumption
19 files changed, 879 insertions, 17 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index c790d982684..9857c6fbb5c 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1128,6 +1128,13 @@ void TSharedCacheInitializer::InitializeServices( config->TotalAsyncQueueInFlyLimit = cfg.GetAsyncQueueInFlyLimit(); config->TotalScanQueueInFlyLimit = cfg.GetScanQueueInFlyLimit(); + if (cfg.HasActivePagesReservationPercent()) { + config->ActivePagesReservationPercent = cfg.GetActivePagesReservationPercent(); + } + if (cfg.HasMemTableReservationPercent()) { + config->MemTableReservationPercent = cfg.GetMemTableReservationPercent(); + } + TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); TIntrusivePtr<::NMonitoring::TDynamicCounters> sausageGroup = tabletGroup->GetSubgroup("type", "S_CACHE"); diff --git a/ydb/core/protos/shared_cache.proto b/ydb/core/protos/shared_cache.proto index 8cca85c0f90..33758ea0980 100644 --- a/ydb/core/protos/shared_cache.proto +++ b/ydb/core/protos/shared_cache.proto @@ -6,4 +6,5 @@ message TSharedCacheConfig { optional uint64 ScanQueueInFlyLimit = 2 [default = 536870912]; optional uint64 AsyncQueueInFlyLimit = 3 [default = 536870912]; optional uint32 ActivePagesReservationPercent = 4; + optional uint32 MemTableReservationPercent = 5; } diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 449908f81e4..4851cde9e50 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -23,6 +23,7 @@ #include "flat_abi_evol.h" #include "probes.h" #include "shared_sausagecache.h" +#include "shared_cache_memtable.h" #include "util_fmt_desc.h" #include <ydb/core/base/appdata.h> @@ -57,6 +58,36 @@ struct TCompactionChangesCtx { { } }; +class TSharedPageCacheMemTableObserver : public NSharedCache::ISharedPageCacheMemTableObserver { +public: + TSharedPageCacheMemTableObserver(TActorSystem* actorSystem, TActorId owner) + : ActorSystem(actorSystem) + , Owner(owner) + , SharedCacheId(MakeSharedPageCacheId()) + {} + + void Register(ui32 table) override { + Send(new NSharedCache::TEvMemTableRegister(table)); + } + + void Unregister(ui32 table) override { + Send(new NSharedCache::TEvMemTableUnregister(table)); + } + + void CompactionComplete(TIntrusivePtr<NSharedCache::ISharedPageCacheMemTableRegistration> registration) override { + Send(new NSharedCache::TEvMemTableCompacted(std::move(registration))); + } + +private: + void Send(IEventBase* ev) { + ActorSystem->Send(new IEventHandle(SharedCacheId, Owner, ev)); + } + + TActorSystem* ActorSystem; + const TActorId Owner; + const TActorId SharedCacheId; +}; + TTableSnapshotContext::TTableSnapshotContext() = default; TTableSnapshotContext::~TTableSnapshotContext() = default; @@ -367,7 +398,8 @@ void TExecutor::Active(const TActorContext &ctx) { CommitManager->Start(this, Owner->Tablet(), &Step0, Counters.Get()); - CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(Logger.Get(), Broker.Get(), this, loadedState->Comp, + auto sharedPageCacheMemTableObserver = MakeHolder<TSharedPageCacheMemTableObserver>(NActors::TActivationContext::ActorSystem(), SelfId()); + CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(std::move(sharedPageCacheMemTableObserver), Logger.Get(), Broker.Get(), this, loadedState->Comp, Sprintf("tablet-%" PRIu64, Owner->TabletID()))); LogicRedo->InstallCounters(Counters.Get(), nullptr); @@ -3679,6 +3711,22 @@ bool TExecutor::CompactTables() { } } +void TExecutor::Handle(NSharedCache::TEvMemTableRegistered::TPtr &ev) { + const auto *msg = ev->Get(); + + if (CompactionLogic) { + CompactionLogic->ProvideSharedPageCacheMemTableRegistration(msg->Table, std::move(msg->Registration)); + } +} + +void TExecutor::Handle(NSharedCache::TEvMemTableCompact::TPtr &ev) { + const auto *msg = ev->Get(); + + if (CompactionLogic) { + CompactionLogic->TriggerSharedPageCacheMemTableCompaction(msg->Table, msg->ExpectedSize); + } +} + void TExecutor::AllowBorrowedGarbageCompaction(ui32 tableId) { if (CompactionLogic) { return CompactionLogic->AllowBorrowedGarbageCompaction(tableId); @@ -3727,6 +3775,8 @@ STFUNC(TExecutor::StateWork) { HFunc(NOps::TEvScanStat, Handle); hFunc(NOps::TEvResult, Handle); HFunc(NBlockIO::TEvStat, Handle); + hFunc(NSharedCache::TEvMemTableRegistered, Handle); + hFunc(NSharedCache::TEvMemTableCompact, Handle); default: break; } diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 7e842f8d33e..89064082997 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -645,6 +645,9 @@ public: ui64 CompactTable(ui32 tableId) override; bool CompactTables() override; + void Handle(NSharedCache::TEvMemTableRegistered::TPtr &ev); + void Handle(NSharedCache::TEvMemTableCompact::TPtr &ev); + void AllowBorrowedGarbageCompaction(ui32 tableId) override; void FollowerAttached() override; diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp index 7714664438e..407d460d1b3 100644 --- a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp +++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp @@ -17,12 +17,14 @@ TCompactionLogicState::TSnapRequest::~TSnapRequest() TCompactionLogicState::TTableInfo::~TTableInfo() {} -TCompactionLogic::TCompactionLogic(NUtil::ILogger *logger, +TCompactionLogic::TCompactionLogic(THolder<NSharedCache::ISharedPageCacheMemTableObserver> sharedPageCacheMemTableObserver, + NUtil::ILogger *logger, NTable::IResourceBroker *broker, NTable::ICompactionBackend *backend, TAutoPtr<TCompactionLogicState> state, TString taskNameSuffix) - : Logger(logger) + : SharedPageCacheMemTableObserver(std::move(sharedPageCacheMemTableObserver)) + , Logger(logger) , Broker(broker) , Backend(backend) , Time(TAppData::TimeProvider.Get()) @@ -192,6 +194,31 @@ ui64 TCompactionLogic::PrepareForceCompaction(ui32 table, EForceCompaction mode) return tableInfo->CurrentForcedMemCompactionId; } +void TCompactionLogic::TriggerSharedPageCacheMemTableCompaction(ui32 table, ui64 expectedSize) { + TCompactionLogicState::TTableInfo *tableInfo = State->Tables.FindPtr(table); + if (!tableInfo) + return; + + auto &inMem = tableInfo->InMem; + auto &policy = tableInfo->Policy->BackgroundSnapshotPolicy; + + if (inMem.State == ECompactionState::Free) { + if (inMem.EstimatedSize >= expectedSize) { + auto priority = tableInfo->ComputeBackgroundPriority(inMem.CompactionTask, policy, 100, Time->Now()); + SubmitCompactionTask(table, 0, policy.ResourceBrokerTask, + priority, inMem.CompactionTask); + inMem.State = ECompactionState::PendingBackground; + } else { + // there was a race and we finished some compaction while our message was waiting + // so let's notify that we completed it + Y_VERIFY_DEBUG(tableInfo->SharedPageCacheMemTableRegistration); + if (auto& registration = tableInfo->SharedPageCacheMemTableRegistration) { + SharedPageCacheMemTableObserver->CompactionComplete(registration); + } + } + } +} + TFinishedCompactionInfo TCompactionLogic::GetFinishedCompactionInfo(ui32 table) { TCompactionLogicState::TTableInfo *tableInfo = State->Tables.FindPtr(table); if (!tableInfo || !tableInfo->Strategy) @@ -259,6 +286,10 @@ TReflectSchemeChangesResult TCompactionLogic::ReflectSchemeChanges() if (table.AllowBorrowedGarbageCompaction) { table.Strategy->AllowBorrowedGarbageCompaction(); } + + if (!table.SharedPageCacheMemTableRegistration) { + SharedPageCacheMemTableObserver->Register(info.Id); + } } else { Y_VERIFY(table.Strategy); table.Strategy->ReflectSchema(); @@ -277,6 +308,17 @@ TReflectSchemeChangesResult TCompactionLogic::ReflectSchemeChanges() return result; } +void TCompactionLogic::ProvideSharedPageCacheMemTableRegistration(ui32 table, TIntrusivePtr<NSharedCache::ISharedPageCacheMemTableRegistration> registration) +{ + auto *tableInfo = State->Tables.FindPtr(table); + if (tableInfo) { + registration->SetConsumption(tableInfo->InMem.EstimatedSize); + + Y_VERIFY_DEBUG(!tableInfo->SharedPageCacheMemTableRegistration); + tableInfo->SharedPageCacheMemTableRegistration = std::move(registration); + } +} + void TCompactionLogic::ReflectRemovedRowVersions(ui32 table) { auto *tableInfo = State->Tables.FindPtr(table); @@ -305,6 +347,9 @@ THolder<NTable::ICompactionStrategy> TCompactionLogic::CreateStrategy( void TCompactionLogic::StopTable(TCompactionLogicState::TTableInfo &table) { + // Note: should be called even without table.SharedPageCacheMemTableRegistration + SharedPageCacheMemTableObserver->Unregister(table.TableId); + if (table.Strategy) { // Strategy will cancel all pending and running compactions table.Strategy->Stop(); @@ -389,6 +434,13 @@ void TCompactionLogic::UpdateInMemStatsStep(ui32 table, ui32 steps, ui64 size) { mem.EstimatedSize = size; mem.Steps += steps; + if (auto& registration = info->SharedPageCacheMemTableRegistration) { + registration->SetConsumption(size); + if (steps == 0) { + SharedPageCacheMemTableObserver->CompactionComplete(registration); + } + } + CheckInMemStats(table); } diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.h b/ydb/core/tablet_flat/flat_executor_compaction_logic.h index 35a903af31b..a476d766f77 100644 --- a/ydb/core/tablet_flat/flat_executor_compaction_logic.h +++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.h @@ -5,6 +5,7 @@ #include "flat_store_bundle.h" #include "flat_exec_broker.h" #include "logic_redo_eggs.h" +#include "shared_cache_memtable.h" #include "util_fmt_line.h" #include <ydb/core/base/localdb.h> #include <library/cpp/time_provider/time_provider.h> @@ -85,6 +86,8 @@ struct TCompactionLogicState { THolder<NTable::ICompactionStrategy> Strategy; + TIntrusivePtr<NSharedCache::ISharedPageCacheMemTableRegistration> SharedPageCacheMemTableRegistration; + TDeque<TSnapRequest> SnapRequests; TIntrusiveConstPtr<TCompactionPolicy> Policy; @@ -156,6 +159,7 @@ struct TReflectSchemeChangesResult { }; class TCompactionLogic { + THolder<NSharedCache::ISharedPageCacheMemTableObserver> SharedPageCacheMemTableObserver; NUtil::ILogger * const Logger; NTable::IResourceBroker * const Broker; NTable::ICompactionBackend * const Backend; @@ -189,6 +193,7 @@ public: static constexpr ui32 BAD_PRIORITY = Max<ui32>(); TCompactionLogic( + THolder<NSharedCache::ISharedPageCacheMemTableObserver> sharedPageCacheMemTableObserver, NUtil::ILogger*, NTable::IResourceBroker*, NTable::ICompactionBackend*, @@ -217,11 +222,14 @@ public: bool PrepareForceCompaction(); ui64 PrepareForceCompaction(ui32 table, EForceCompaction mode = EForceCompaction::Full); + void TriggerSharedPageCacheMemTableCompaction(ui32 table, ui64 expectedSize); + TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 table); void AllowBorrowedGarbageCompaction(ui32 table); TReflectSchemeChangesResult ReflectSchemeChanges(); + void ProvideSharedPageCacheMemTableRegistration(ui32 table, TIntrusivePtr<NSharedCache::ISharedPageCacheMemTableRegistration> registration); void ReflectRemovedRowVersions(ui32 table); void UpdateInMemStatsStep(ui32 table, ui32 steps, ui64 size); void CheckInMemStats(ui32 table); diff --git a/ydb/core/tablet_flat/shared_cache_events.h b/ydb/core/tablet_flat/shared_cache_events.h index a7abe542efc..24bac1a291c 100644 --- a/ydb/core/tablet_flat/shared_cache_events.h +++ b/ydb/core/tablet_flat/shared_cache_events.h @@ -3,6 +3,7 @@ #include "defs.h" #include "flat_bio_events.h" #include "shared_handle.h" +#include "shared_cache_memtable.h" #include <util/generic/map.h> #include <util/generic/set.h> @@ -27,6 +28,11 @@ namespace NSharedCache { EvResult, EvUpdated, EvMem, + EvMemTableRegister, + EvMemTableRegistered, + EvMemTableCompact, + EvMemTableCompacted, + EvMemTableUnregister, EvEnd @@ -134,6 +140,49 @@ namespace NSharedCache { struct TEvMem : public TEventLocal<TEvMem, EvMem> { }; + struct TEvMemTableRegister : public TEventLocal<TEvMemTableRegister, EvMemTableRegister> { + const ui32 Table; + + TEvMemTableRegister(ui32 table) + : Table(table) + {} + }; + + struct TEvMemTableRegistered : public TEventLocal<TEvMemTableRegistered, EvMemTableRegistered> { + const ui32 Table; + TIntrusivePtr<ISharedPageCacheMemTableRegistration> Registration; + + TEvMemTableRegistered(ui32 table, TIntrusivePtr<ISharedPageCacheMemTableRegistration> registration) + : Table(table) + , Registration(std::move(registration)) + {} + }; + + struct TEvMemTableCompact : public TEventLocal<TEvMemTableCompact, EvMemTableCompact> { + const ui32 Table; + const ui64 ExpectedSize; + + TEvMemTableCompact(ui32 table, ui64 expectedSize) + : Table(table) + , ExpectedSize(expectedSize) + {} + }; + + struct TEvMemTableCompacted : public TEventLocal<TEvMemTableCompacted, EvMemTableCompacted> { + const TIntrusivePtr<ISharedPageCacheMemTableRegistration> Registration; + + TEvMemTableCompacted(TIntrusivePtr<ISharedPageCacheMemTableRegistration> registration) + : Registration(registration) + {} + }; + + struct TEvMemTableUnregister : public TEventLocal<TEvMemTableUnregister, EvMemTableUnregister> { + const ui32 Table; + + TEvMemTableUnregister(ui32 table) + : Table(table) + {} + }; } } diff --git a/ydb/core/tablet_flat/shared_cache_memtable.h b/ydb/core/tablet_flat/shared_cache_memtable.h new file mode 100644 index 00000000000..e78682bed92 --- /dev/null +++ b/ydb/core/tablet_flat/shared_cache_memtable.h @@ -0,0 +1,23 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr { +namespace NSharedCache { + +class ISharedPageCacheMemTableRegistration : TNonCopyable, public TThrRefBase { +public: + virtual void SetConsumption(ui64 newConsumption) = 0; + virtual ~ISharedPageCacheMemTableRegistration() = default; +}; + +class ISharedPageCacheMemTableObserver : TNonCopyable { +public: + virtual void Register(ui32 table) = 0; + virtual void Unregister(ui32 table) = 0; + virtual void CompactionComplete(TIntrusivePtr<ISharedPageCacheMemTableRegistration> registration) = 0; + virtual ~ISharedPageCacheMemTableObserver() = default; +}; + +} +} diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp index 40d86df5fdc..a35e80af211 100644 --- a/ydb/core/tablet_flat/shared_sausagecache.cpp +++ b/ydb/core/tablet_flat/shared_sausagecache.cpp @@ -27,6 +27,8 @@ TSharedPageCacheCounters::TSharedPageCacheCounters(const TIntrusivePtr<::NMonito , CacheMissBytes(group->GetCounter("CacheMissBytes", true)) , LoadInFlyPages(group->GetCounter("LoadInFlyPages")) , LoadInFlyBytes(group->GetCounter("LoadInFlyBytes")) + , MemTableTotalBytes(group->GetCounter("MemTableTotalBytes")) + , MemTableCompactingBytes(group->GetCounter("MemTableCompactingBytes")) { } } @@ -41,6 +43,147 @@ static bool Satisfies(NLog::EPriority priority = NLog::PRI_DEBUG) { return false; } +class TSharedPageCacheMemTableTracker; + +class TSharedPageCacheMemTableRegistration : public NSharedCache::ISharedPageCacheMemTableRegistration { +public: + TSharedPageCacheMemTableRegistration(TActorId owner, ui32 table, std::weak_ptr<TSharedPageCacheMemTableTracker> tracker) + : Tracker(std::move(tracker)) + , Owner(owner) + , Table(table) + {} + + ui64 GetConsumption() const { + return Consumption.load(); + } + + void SetConsumption(ui64 newConsumption); + +private: + std::weak_ptr<TSharedPageCacheMemTableTracker> Tracker; + std::atomic<ui64> Consumption = 0; + +public: + const TActorId Owner; + const ui32 Table; +}; + +class TSharedPageCacheMemTableTracker : public std::enable_shared_from_this<TSharedPageCacheMemTableTracker> { + friend TSharedPageCacheMemTableRegistration; + +public: + TSharedPageCacheMemTableTracker(TIntrusivePtr<TSharedPageCacheCounters> counters) + : Counters(counters) + {} + + ui64 GetTotalConsumption() { + return TotalConsumption.load(); + } + + ui64 GetTotalCompacting() { + return TotalCompacting; + } + + TIntrusivePtr<TSharedPageCacheMemTableRegistration> Register(TActorId owner, ui32 table) { + auto &ret = Registrations[{owner, table}]; + if (!ret) { + ret = MakeIntrusive<TSharedPageCacheMemTableRegistration>(owner, table, weak_from_this()); + NonCompacting.insert(ret); + } + return ret; + } + + void Unregister(TActorId owner, ui32 table) { + auto it = Registrations.find({owner, table}); + if (it != Registrations.end()) { + auto& registration = it->second; + CompactionComplete(registration); + registration->SetConsumption(0); + Y_VERIFY_DEBUG(NonCompacting.contains(registration)); + NonCompacting.erase(registration); + Registrations.erase(it); + } + } + + void CompactionComplete(TIntrusivePtr<TSharedPageCacheMemTableRegistration> registration) { + auto it = Compacting.find(registration); + if (it != Compacting.end()) { + ChangeTotalCompacting(-it->second); + NonCompacting.insert(it->first); + Compacting.erase(it); + } + } + + /** + * @return registrations and sizes that should be compacted + */ + TVector<std::pair<TIntrusivePtr<TSharedPageCacheMemTableRegistration>, ui64>> SelectForCompaction(ui64 toCompact) { + TVector<std::pair<TIntrusivePtr<TSharedPageCacheMemTableRegistration>, ui64>> ret; + + if (toCompact <= TotalCompacting) { + // nothing to compact more + return ret; + } + + for (const auto &r : NonCompacting) { + ui64 consumption = r->GetConsumption(); + if (consumption) { + ret.emplace_back(r, consumption); + } + } + + Sort(ret, [](const auto &x, const auto &y) { return x.second > y.second; }); + + size_t take = 0; + for (auto it = ret.begin(); it != ret.end() && toCompact > TotalCompacting; it++) { + auto reg = it->first; + + Compacting[reg] = it->second; + Y_VERIFY(NonCompacting.erase(reg)); + + ChangeTotalCompacting(it->second); + + take++; + } + + ret.resize(take); + return ret; + } + +private: + void ChangeTotalConsumption(ui64 delta) { + TotalConsumption.fetch_add(delta); + if (Counters) { + Counters->MemTableTotalBytes->Add(delta); + } + } + + void ChangeTotalCompacting(ui64 delta) { + TotalCompacting += delta; + if (Counters) { + Counters->MemTableCompactingBytes->Add(delta); + } + } + +private: + TIntrusivePtr<TSharedPageCacheCounters> Counters; + TMap<std::pair<TActorId, ui32>, TIntrusivePtr<TSharedPageCacheMemTableRegistration>> Registrations; + THashSet<TIntrusivePtr<TSharedPageCacheMemTableRegistration>> NonCompacting; + THashMap<TIntrusivePtr<TSharedPageCacheMemTableRegistration>, ui64> Compacting; + std::atomic<ui64> TotalConsumption = 0; + // Approximate value, updates only on compaction start/stop. + // + // Counts only Shared Cache triggered compactions. + ui64 TotalCompacting = 0; +}; + +void TSharedPageCacheMemTableRegistration::SetConsumption(ui64 newConsumption) { + ui64 before = Consumption.exchange(newConsumption); + if (auto t = Tracker.lock()) { + t->ChangeTotalConsumption(newConsumption - before); + } +} + class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { using ELnLev = NUtil::ELnLev; using TBlocks = TVector<NSharedCache::TEvResult::TLoaded>; @@ -165,6 +308,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { THashMap<TLogoBlobID, TCollection> Collections; THashMap<TActorId, TCollectionsOwner> CollectionsOwners; TIntrusivePtr<TMemObserver> MemObserver; + std::shared_ptr<TSharedPageCacheMemTableTracker> MemTableTracker; TRequestQueue AsyncRequests; TRequestQueue ScanRequests; @@ -258,6 +402,37 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { ActualizeCacheSizeLimit(); DoGC(); + + if (MemLimitBytes && MemLimitBytes < ConfigLimitBytes) { + // in normal scenario we expect that we can fill the whole shared cache + ui64 memTableReservedBytes = ConfigLimitBytes * Config->MemTableReservationPercent / 100; + ui64 memTableTotal = MemTableTracker->GetTotalConsumption(); + if (memTableTotal > memTableReservedBytes) { + ui64 toCompact = Min(ConfigLimitBytes - MemLimitBytes, memTableTotal - memTableReservedBytes); + auto registrations = MemTableTracker->SelectForCompaction(toCompact); + for (auto registration : registrations) { + Send(registration.first->Owner, new NSharedCache::TEvMemTableCompact(registration.first->Table, registration.second)); + } + } + } + } + + void Handle(NSharedCache::TEvMemTableRegister::TPtr &ev) { + const auto *msg = ev->Get(); + auto registration = MemTableTracker->Register(ev->Sender, msg->Table); + Send(ev->Sender, new NSharedCache::TEvMemTableRegistered(msg->Table, std::move(registration))); + } + + void Handle(NSharedCache::TEvMemTableUnregister::TPtr &ev) { + const auto *msg = ev->Get(); + MemTableTracker->Unregister(ev->Sender, msg->Table); + } + + void Handle(NSharedCache::TEvMemTableCompacted::TPtr &ev) { + const auto *msg = ev->Get(); + if (auto registration = dynamic_cast<TSharedPageCacheMemTableRegistration*>(msg->Registration.Get())) { + MemTableTracker->CompactionComplete(registration); + } } void Registered(TActorSystem *sys, const TActorId &owner) @@ -1044,6 +1219,9 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { if (msg->Record.HasActivePagesReservationPercent()) { Config->ActivePagesReservationPercent = msg->Record.GetActivePagesReservationPercent(); } + if (msg->Record.HasMemTableReservationPercent()) { + Config->MemTableReservationPercent = msg->Record.GetMemTableReservationPercent(); + } if (msg->Record.GetAsyncQueueInFlyLimit() != 0) { AsyncRequests.Limit = msg->Record.GetAsyncQueueInFlyLimit(); @@ -1114,6 +1292,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { public: TSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver) : MemObserver(std::move(memObserver)) + , MemTableTracker(std::make_shared<TSharedPageCacheMemTableTracker>(config->Counters)) , Config(std::move(config)) , Cache(*Config->CacheConfig) , SizeOverride(Config->CacheConfig->Limit, 1, Max<i64>()) @@ -1139,6 +1318,10 @@ public: hFunc(NSharedCache::TEvUnregister, Handle); hFunc(NSharedCache::TEvInvalidate, Handle); hFunc(NSharedCache::TEvMem, Handle); + hFunc(NSharedCache::TEvMemTableRegister, Handle); + hFunc(NSharedCache::TEvMemTableUnregister, Handle); + hFunc(NSharedCache::TEvMemTableCompacted, Handle); + hFunc(NBlockIO::TEvData, Handle); hFunc(TEvSharedPageCache::TEvConfigure, Handle); cFunc(TEvents::TSystem::Wakeup, GCWakeup); diff --git a/ydb/core/tablet_flat/shared_sausagecache.h b/ydb/core/tablet_flat/shared_sausagecache.h index 28ae0825b4b..ce8245f07af 100644 --- a/ydb/core/tablet_flat/shared_sausagecache.h +++ b/ydb/core/tablet_flat/shared_sausagecache.h @@ -38,6 +38,8 @@ struct TSharedPageCacheCounters final : public TAtomicRefCount<TSharedPageCacheC const TCounterPtr CacheMissBytes; const TCounterPtr LoadInFlyPages; const TCounterPtr LoadInFlyBytes; + const TCounterPtr MemTableTotalBytes; + const TCounterPtr MemTableCompactingBytes; explicit TSharedPageCacheCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters> &group); }; @@ -49,6 +51,7 @@ struct TSharedPageCacheConfig { TString CacheName = "SharedPageCache"; TIntrusivePtr<TSharedPageCacheCounters> Counters; ui32 ActivePagesReservationPercent = 50; + ui32 MemTableReservationPercent = 20; }; IActor* CreateSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver); diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h index 5361c1f0c29..44557d41d1e 100644 --- a/ydb/core/tablet_flat/test/libs/exec/runner.h +++ b/ydb/core/tablet_flat/test/libs/exec/runner.h @@ -185,7 +185,7 @@ namespace NFake { config->TotalScanQueueInFlyLimit = conf.ScanQueue; config->Counters = MakeIntrusive<TSharedPageCacheCounters>(Env.GetDynamicCounters()); - auto *actor = CreateSharedPageCache(std::move(config), Env.GetMemObserver()); + auto *actor = CreateSharedPageCache(std::move(config), Env.GetMemObserver()); RunOn(3, MakeSharedPageCacheId(0), actor, EMail::ReadAsFilled); } diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt index f5e5f0917de..ee2423e7d48 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt @@ -72,6 +72,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt index 4723dfaccdf..24d732f728a 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt @@ -75,6 +75,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt index 233cb18aa6b..449b23320ce 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt @@ -76,6 +76,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt index b8a23593306..2b3e5a8d7f1 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt @@ -65,6 +65,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_screen.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_bloom.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_slice_loader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_versions.cpp diff --git a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp index 26bdf4f71ee..00944cfb156 100644 --- a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp +++ b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp @@ -89,16 +89,20 @@ struct TTxReadRow : public ITransaction { }; void LogCounters(TIntrusivePtr<TSharedPageCacheCounters> counters) { - Cerr << "Counters: Active:" << counters->ActiveBytes->Val() << "/" << counters->ActiveLimitBytes->Val() << ", Passive:" << counters->PassiveBytes->Val() << ", MemLimit:" << counters->MemLimitBytes->Val() << Endl; + Cerr << "Counters: Active:" << counters->ActiveBytes->Val() << "/" << counters->ActiveLimitBytes->Val() + << ", Passive:" << counters->PassiveBytes->Val() + << ", MemLimit:" << counters->MemLimitBytes->Val() + << ", MemTable:" << counters->MemTableCompactingBytes->Val() << "/" << counters->MemTableTotalBytes->Val() + << Endl; } -void WaitMemEvent(TMyEnvBase& env) { +void WaitEvent(TMyEnvBase& env, ui32 eventType, ui32 requiredCount = 1) { TDispatchOptions options; - options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(NSharedCache::EvMem, 1)); + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(eventType, requiredCount)); env->DispatchEvents(options); } -Y_UNIT_TEST(Limits) { +Y_UNIT_TEST(PageCacheLimits) { TMyEnvBase env; auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters()); @@ -121,38 +125,38 @@ Y_UNIT_TEST(Limits) { } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(counters->LoadInFlyBytes->Val(), 0); - UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB / 2); // 2 full layers (fresh & staging) + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB/ 3); // 2 full layers (fresh & staging) UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB); UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 0); env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB}); - WaitMemEvent(env); + WaitEvent(env, NSharedCache::EvMem); LogCounters(counters); - UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB / 2); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB/ 3); UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB); UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 5*MB + counters->ActiveBytes->Val() + counters->PassiveBytes->Val()); env->GetMemObserver()->NotifyStat({101*MB, 110*MB, 100*MB}); - WaitMemEvent(env); + WaitEvent(env, NSharedCache::EvMem); LogCounters(counters); - UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2); // 1mb evicted - UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB/ 3); // 1mb evicted + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB / 3 * 2 - MB, MB/ 3); UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), counters->ActiveLimitBytes->Val()); env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB}); - WaitMemEvent(env); + WaitEvent(env, NSharedCache::EvMem); LogCounters(counters); - UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB / 2); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2 - MB, MB/ 3); UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB); UNIT_ASSERT_VALUES_EQUAL(counters->PassiveBytes->Val(), 7772); UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 5*MB + counters->ActiveBytes->Val() + counters->PassiveBytes->Val()); // +5mb env->GetMemObserver()->NotifyStat({200*MB, 110*MB, 100*MB}); - WaitMemEvent(env); + WaitEvent(env, NSharedCache::EvMem); LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(counters->ActiveBytes->Val(), 0); UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 1); // zero limit @@ -160,6 +164,105 @@ Y_UNIT_TEST(Limits) { UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 1); // zero limit } +Y_UNIT_TEST(MemTableRegistration) { + TMyEnvBase env; + + // start: + + env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); + env.SendAsync(new NFake::TEvExecute{ new TTxInitSchema() }); + env.SendAsync(new NFake::TEvExecute{ new TTxWriteRow(1, "asdf") }); + + WaitEvent(env, NSharedCache::EvMemTableRegistered); + + // stop: + + env.SendAsync(new TEvents::TEvPoison); + + WaitEvent(env, NSharedCache::EvUnregister); + + // restart: + + env.FireTablet(env.Edge, env.Tablet, [&](const TActorId &tablet, TTabletStorageInfo *info) { + return new NFake::TDummy(tablet, info, env.Edge, 0); + }); + + WaitEvent(env, NSharedCache::EvMemTableRegistered); +} + +Y_UNIT_TEST(MemTableLimits) { + TMyEnvBase env; + + env->SetLogPriority(NKikimrServices::OPS_COMPACT, NActors::NLog::PRI_INFO); + + auto counters = MakeIntrusive<TSharedPageCacheCounters>(env->GetDynamicCounters()); + + TVector<ui64> tabletIds; + for (size_t tablet = 0; tablet < 10; tablet++) { + tabletIds.push_back(env.Tablet); + env.Tablet++; + } + + for (auto tabletId : tabletIds) { + env.Tablet = tabletId; + + env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); + env.SendSync(new NFake::TEvExecute{ new TTxInitSchema() }); + + // write 10 rows, each ~100KB + for (i64 key = 0; key < 10; ++key) { + TString value(size_t(100 * 1024), char('a' + key % 26)); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) }); + } + + Cerr << "...compacting" << Endl; + env.SendSync(new NFake::TEvCompact(TableId)); + Cerr << "...waiting until compacted" << Endl; + env.WaitFor<NFake::TEvCompacted>(); + + for (i64 key = 0; key < 10; ++key) { + env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key) }); + } + + // write 10 rows, each ~50KB + for (i64 key = 100; key < 110; ++key) { + TString value(size_t(50 * 1024), char('a' + key % 26)); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) }); + } + } + + UNIT_ASSERT_DOUBLES_EQUAL(counters->MemTableTotalBytes->Val(), 5*MB, MB/ 3); + UNIT_ASSERT_VALUES_EQUAL(counters->MemTableCompactingBytes->Val(), 0); + + env->GetMemObserver()->NotifyStat({95*MB, 110*MB, 100*MB}); + WaitEvent(env, NSharedCache::EvMem); + LogCounters(counters); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), 8*MB / 3 * 2, MB/ 3); + UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB); + UNIT_ASSERT_GE(counters->MemLimitBytes->Val(), 8*MB); // bigger than config value + UNIT_ASSERT_DOUBLES_EQUAL(counters->MemTableTotalBytes->Val(), 5*MB, MB/ 3); + + env->GetMemObserver()->NotifyStat({100*MB + 1, 110*MB, 100*MB}); + WaitEvent(env, NSharedCache::EvMem); + LogCounters(counters); + UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveLimitBytes->Val(), 8*MB / 3 * 2, MB/ 3); // 1 page evicted + UNIT_ASSERT_DOUBLES_EQUAL(counters->MemLimitBytes->Val(), 8*MB / 3 * 2, MB/ 3); + + // we want to get ~2.7MB back, it's 6 Mem Tables + env.WaitFor<NFake::TEvCompacted>(6); + UNIT_ASSERT_DOUBLES_EQUAL(counters->MemTableTotalBytes->Val(), 2*MB, MB/ 3); + + env->GetMemObserver()->NotifyStat({200*MB, 110*MB, 100*MB}); + WaitEvent(env, NSharedCache::EvMem); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(counters->ActiveLimitBytes->Val(), 1); // zero limit + UNIT_ASSERT_VALUES_EQUAL(counters->MemLimitBytes->Val(), 1); // zero limit + + // compacted all except reservation, it's all except 20% of 8MB = 1.6MB = 3 Mem Tables + env.WaitFor<NFake::TEvCompacted>(1); // so 1 more Mem Table compacted + UNIT_ASSERT_DOUBLES_EQUAL(counters->MemTableTotalBytes->Val(), 1.5*MB, MB/ 3); +} + } // Y_UNIT_TEST_SUITE(TSharedPageCache) } // namespace NTabletFlatExecutor diff --git a/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp b/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp new file mode 100644 index 00000000000..5f10d679622 --- /dev/null +++ b/ydb/core/tablet_flat/ut/ut_shared_sausagecache_memtable.cpp @@ -0,0 +1,374 @@ +#include <library/cpp/testing/unittest/registar.h> +#include "shared_sausagecache.cpp" + +namespace NKikimr { +namespace NTabletFlatExecutor { +Y_UNIT_TEST_SUITE(TSharedPageCacheMemTableTracker) { + +#define VERIFY_TOTAL(expected) \ +do { \ + UNIT_ASSERT_VALUES_EQUAL(tracker->GetTotalConsumption(), expected); \ + UNIT_ASSERT_VALUES_EQUAL(counters->MemTableTotalBytes->Val(), i64(expected)); \ +} while (false) + +#define VERIFY_COMPACTING(expected) \ +do { \ + UNIT_ASSERT_VALUES_EQUAL(tracker->GetTotalCompacting(), expected); \ + UNIT_ASSERT_VALUES_EQUAL(counters->MemTableCompactingBytes->Val(), i64(expected)); \ +} while (false) + +Y_UNIT_TEST(Empty) { + auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()); + auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + UNIT_ASSERT(tracker->SelectForCompaction(100).empty()); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); +} + +Y_UNIT_TEST(Destruction) { + std::weak_ptr<TSharedPageCacheMemTableTracker> wTracker; + + { + auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()); + auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters); + wTracker = tracker; + + auto registration = tracker->Register(TActorId{1, 10}, 100); + registration->SetConsumption(1000); + + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + } + + UNIT_ASSERT(!wTracker.lock()); +} + +Y_UNIT_TEST(Register) { + auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()); + auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters); + + { // non-compacting registration + auto registration = tracker->Register(TActorId{1, 10}, 100); + registration->SetConsumption(1000); + + // weak pointer doesn't use + UNIT_ASSERT_VALUES_EQUAL(tracker.use_count(), 1); + + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + + UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 3); + tracker->Unregister(TActorId{1, 10}, 100); + UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 1); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + registration.Reset(); + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + } + + { // compacting registration + auto registration = tracker->Register(TActorId{1, 10}, 100); + registration->SetConsumption(1000); + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1).size(), 1); + + // weak pointer doesn't use + UNIT_ASSERT_VALUES_EQUAL(tracker.use_count(), 1); + + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(1000); + + UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 3); + tracker->Unregister(TActorId{1, 10}, 100); + UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 1); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + registration.Reset(); + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + } + + { // duplicate registration + auto registration1 = tracker->Register(TActorId{1, 10}, 100); + registration1->SetConsumption(1000); + + auto registration2 = tracker->Register(TActorId{1, 10}, 100); + UNIT_ASSERT_VALUES_EQUAL(registration1, registration2); + + // weak pointer doesn't use + UNIT_ASSERT_VALUES_EQUAL(tracker.use_count(), 1); + + registration2->SetConsumption(2000); + VERIFY_TOTAL(2000); + VERIFY_COMPACTING(0); + + UNIT_ASSERT_VALUES_EQUAL(registration2->RefCount(), 4); + tracker->Unregister(TActorId{1, 10}, 100); + UNIT_ASSERT_VALUES_EQUAL(registration2->RefCount(), 2); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + registration1.Reset(); + registration2.Reset(); + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + } +} + +Y_UNIT_TEST(Unregister) { + auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()); + auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters); + + { // double unregister + auto registration = tracker->Register(TActorId{1, 10}, 100); + registration->SetConsumption(1000); + + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + + tracker->Unregister(TActorId{1, 10}, 100); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + tracker->Unregister(TActorId{1, 10}, 100); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 1); + } + + { // unregister and register back + auto registration1 = tracker->Register(TActorId{1, 10}, 100); + registration1->SetConsumption(1000); + + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + + tracker->Unregister(TActorId{1, 10}, 100); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + auto registration2 = tracker->Register(TActorId{1, 10}, 100); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + registration2->SetConsumption(2000); + + VERIFY_TOTAL(2000); + VERIFY_COMPACTING(0); + } + + { // someone else unregister + auto registration = tracker->Register(TActorId{1, 10}, 100); + registration->SetConsumption(1000); + + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + + tracker->Unregister(TActorId{1, 10}, 200); + + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + + UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 3); + + tracker->Unregister(TActorId{1, 10}, 100); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + + UNIT_ASSERT_VALUES_EQUAL(registration->RefCount(), 1); + } +} + +Y_UNIT_TEST(SetConsumption) { + auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()); + auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters); + + { // non-compacting updates + auto registration = tracker->Register(TActorId{1, 10}, 100); + + registration->SetConsumption(1000); + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + + registration->SetConsumption(2000); + VERIFY_TOTAL(2000); + VERIFY_COMPACTING(0); + + registration->SetConsumption(999); + VERIFY_TOTAL(999); + VERIFY_COMPACTING(0); + + tracker->Unregister(TActorId{1, 10}, 100); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + } + + { // compacting updates + auto registration = tracker->Register(TActorId{1, 10}, 100); + + registration->SetConsumption(1000); + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1).size(), 1); + + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(1000); + + registration->SetConsumption(2000); + VERIFY_TOTAL(2000); + VERIFY_COMPACTING(1000); + + registration->SetConsumption(999); + VERIFY_TOTAL(999); + VERIFY_COMPACTING(1000); + + tracker->Unregister(TActorId{1, 10}, 100); + + VERIFY_TOTAL(0); + VERIFY_COMPACTING(0); + } +} + +Y_UNIT_TEST(CompactionComplete) { + auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()); + auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters); + + { + auto registration = tracker->Register(TActorId{1, 10}, 100); + + registration->SetConsumption(1000); + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1).size(), 1); + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(1000); + + tracker->CompactionComplete(registration); + VERIFY_TOTAL(1000); + VERIFY_COMPACTING(0); + } +} + +Y_UNIT_TEST(SelectForCompaction) { + auto counters = MakeIntrusive<TSharedPageCacheCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()); + auto tracker = std::make_shared<TSharedPageCacheMemTableTracker>(counters); + + TVector<TIntrusivePtr<TSharedPageCacheMemTableRegistration>> registrations; + for (size_t i = 0; i < 3; i++) { + registrations.push_back(tracker->Register(TActorId{1, 10}, 100 + i)); + } + + auto reset = [&]() { + registrations[0]->SetConsumption(3); + registrations[1]->SetConsumption(100); + registrations[2]->SetConsumption(20); + + for (size_t i = 0; i < 3; i++) { + tracker->CompactionComplete(registrations[i]); + } + + VERIFY_TOTAL(123); + VERIFY_COMPACTING(0); + }; + + TVector<std::pair<TIntrusivePtr<TSharedPageCacheMemTableRegistration>, ui64>> expected; + + reset(); + expected = {{registrations[1], 100}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(100); + + reset(); + expected = {{registrations[1], 100}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(100), expected); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(100); + + reset(); + expected = {{registrations[1], 100}, {registrations[2], 20}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(101), expected); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(120); + + reset(); + expected = {{registrations[1], 100}, {registrations[2], 20}, {registrations[0], 3}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(123), expected); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(123); + + reset(); + expected = {{registrations[1], 100}, {registrations[2], 20}, {registrations[0], 3}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(999), expected); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(123); + + reset(); + expected = {{registrations[1], 100}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected); + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(100).size(), 0); // compacts max, not sum + VERIFY_TOTAL(123); + VERIFY_COMPACTING(100); + + reset(); + expected = {{registrations[1], 100}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected); + expected = {{registrations[2], 20}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(101), expected); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(120); + + reset(); + expected = {{registrations[1], 100}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(100); + tracker->CompactionComplete(registrations[1]); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(0); + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected); + VERIFY_TOTAL(123); + VERIFY_COMPACTING(100); + + reset(); + expected = {{registrations[1], 100}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(1), expected); + registrations[1]->SetConsumption(1000); + VERIFY_TOTAL(1023); + VERIFY_COMPACTING(100); // didn't update + + reset(); + registrations[2]->SetConsumption(0); + expected = {{registrations[1], 100}, {registrations[0], 3}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(999), expected); + VERIFY_TOTAL(103); + VERIFY_COMPACTING(103); + + reset(); + tracker->Unregister(TActorId{1, 10}, 101); + expected = {{registrations[2], 20}, {registrations[0], 3}}; + UNIT_ASSERT_VALUES_EQUAL(tracker->SelectForCompaction(100), expected); + VERIFY_TOTAL(23); + VERIFY_COMPACTING(23); + registrations[1] = tracker->Register(TActorId{1, 10}, 101); +} + +} +} +} diff --git a/ydb/core/tablet_flat/ut/ya.make b/ydb/core/tablet_flat/ut/ya.make index aeae789e3a4..b26924453a6 100644 --- a/ydb/core/tablet_flat/ut/ya.make +++ b/ydb/core/tablet_flat/ut/ya.make @@ -48,6 +48,7 @@ SRCS( ut_screen.cpp ut_bloom.cpp ut_shared_sausagecache.cpp + ut_shared_sausagecache_memtable.cpp ut_slice.cpp ut_slice_loader.cpp ut_versions.cpp diff --git a/ydb/core/tablet_flat/ya.make b/ydb/core/tablet_flat/ya.make index ad1c4fcfe08..4af18689526 100644 --- a/ydb/core/tablet_flat/ya.make +++ b/ydb/core/tablet_flat/ya.make @@ -66,6 +66,7 @@ SRCS( shared_handle.cpp shared_sausagecache.cpp shared_sausagecache.h + shared_cache_memtable.h tablet_flat_executor.h tablet_flat_executor.cpp tablet_flat_executed.h |