aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungasc <kungasc@yandex-team.com>2023-06-08 20:12:14 +0300
committerkungasc <kungasc@yandex-team.com>2023-06-08 20:12:14 +0300
commitfbe8270e3157859963c28daa1b4f7750ded631fa (patch)
tree9b6da87668b95573e00d920c80093c222c2da469
parentc97fba6c6c589c28d5801ca15288b7cf1e8eac73 (diff)
downloadydb-fbe8270e3157859963c28daa1b4f7750ded631fa.tar.gz
Simple private cache
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp152
-rw-r--r--ydb/core/tablet_flat/flat_executor.h2
-rw-r--r--ydb/core/tablet_flat/flat_executor_tx_env.h19
-rw-r--r--ydb/core/tablet_flat/flat_sausagecache.cpp431
-rw-r--r--ydb/core/tablet_flat/flat_sausagecache.h136
5 files changed, 328 insertions, 412 deletions
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index fd9fe16404f..8e2a8ff93d8 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -1553,6 +1553,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
THPTimer cpuTimer;
+ PrivatePageCache->ResetTouchesAndToLoad(true);
TPageCollectionTxEnv env(*Database, *PrivatePageCache);
TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
@@ -1619,28 +1620,21 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
CommitTransactionLog(seat, env, prod.Change, cpuTimer, ctx);
} else {
Y_VERIFY(!seat->CapturedMemory);
- if (!env.ToLoad && !seat->RequestedMemory && !txc.IsRescheduled()) {
+ if (!PrivatePageCache->GetStats().CurrentCacheMisses && !seat->RequestedMemory && !txc.IsRescheduled()) {
Y_Fail(NFmt::Do(*this) << " " << NFmt::Do(*seat) << " type "
<< NFmt::Do(*seat->Self) << " postoned w/o demands");
}
PostponeTransaction(seat, env, prod.Change, cpuTimer, ctx);
}
+ PrivatePageCache->ResetTouchesAndToLoad(false);
ActiveTransaction = false;
PlanTransactionActivation();
}
void TExecutor::UnpinTransactionPages(TSeat &seat) {
- for (auto &xinfoid : seat.Pinned) {
- if (TPrivatePageCache::TInfo *info = PrivatePageCache->Info(xinfoid.first)) {
- for (auto &x : xinfoid.second) {
- ui32 pageId = x.first;
- TPrivatePageCachePinPad *pad = x.second.Get();
- x.second.Reset();
- PrivatePageCache->Unpin(pageId, pad, info);
- }
- }
- }
+ size_t unpinnedPages = 0;
+ PrivatePageCache->UnpinPages(seat.Pinned, unpinnedPages);
seat.Pinned.clear();
seat.MemoryTouched = 0;
@@ -1672,41 +1666,19 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &
TTxType txType = seat->Self->GetTxType();
ui32 touchedPages = 0;
- ui32 touchedBytes = 0;
+ ui64 touchedBytes = 0;
ui32 newPinnedPages = 0;
ui32 waitPages = 0;
ui32 loadPages = 0;
ui64 loadBytes = 0;
ui64 prevTouched = seat->MemoryTouched;
- // must pin new entries
- for (auto &xpair : env.Touches) {
- TPrivatePageCache::TInfo *pageCollectionInfo = xpair.first;
- auto &pinned = seat->Pinned[pageCollectionInfo->Id];
- for (auto &x : xpair.second) {
- // would insert only if first seen
- if (pinned.insert(std::make_pair(x, PrivatePageCache->Pin(x, pageCollectionInfo))).second) {
- ++newPinnedPages;
- seat->MemoryTouched += pageCollectionInfo->GetPage(x)->Size;
- }
- }
- touchedPages += xpair.second.size();
- }
+ PrivatePageCache->PinTouches(seat->Pinned, touchedPages, newPinnedPages, seat->MemoryTouched);
touchedBytes = seat->MemoryTouched - prevTouched;
prevTouched = seat->MemoryTouched;
- for (auto &xpair : env.ToLoad) {
- TPrivatePageCache::TInfo *pageCollectionInfo = xpair.first;
- auto &pinned = seat->Pinned[pageCollectionInfo->Id];
-
- for (auto &x : xpair.second) {
- if (pinned.insert(std::make_pair(x, PrivatePageCache->Pin(x, pageCollectionInfo))).second) {
- ++newPinnedPages;
- seat->MemoryTouched += pageCollectionInfo->GetPage(x)->Size;
- }
- }
- }
+ PrivatePageCache->PinToLoad(seat->Pinned, newPinnedPages, seat->MemoryTouched);
if (seat->AttachedMemory)
Memory->AttachMemory(*seat);
@@ -1776,7 +1748,7 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &
// If memory was allocated and there is nothing to load
// then tx may be re-activated.
- if (!env.ToLoad) {
+ if (!PrivatePageCache->GetStats().CurrentCacheMisses) {
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
@@ -1789,18 +1761,13 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &
auto *const pad = padHolder.Get();
TransactionWaitPads[pad] = std::move(padHolder);
- for (auto &xpair : env.ToLoad) {
+ auto toLoad = PrivatePageCache->GetToLoad();
+ for (auto &xpair : toLoad) {
TPrivatePageCache::TInfo *pageCollectionInfo = xpair.first;
+ TVector<NTable::TPageId> &pages = xpair.second;
+ waitPages += pages.size();
- TVector<NTable::TPageId> pages;
- pages.reserve(xpair.second.size());
-
- waitPages += xpair.second.size();
- for (auto &x : xpair.second) {
- pages.push_back(x);
- }
-
- const std::pair<ui32, ui64> toLoad = PrivatePageCache->Load(pages, pad, pageCollectionInfo);
+ const std::pair<ui32, ui64> toLoad = PrivatePageCache->Request(pages, pad, pageCollectionInfo);
if (toLoad.first) {
auto *req = new NPageCollection::TFetch(0, pageCollectionInfo->PageCollection, std::move(pages));
@@ -1846,13 +1813,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
const bool isTerminated = seat->TerminationReason != ETerminationReason::None;
const TTxType txType = seat->Self->GetTxType();
- ui64 touchedBlocks = 0;
- for (auto &xpair : env.Touches) {
- TPrivatePageCache::TInfo *pageCollectionInfo = xpair.first;
- touchedBlocks += xpair.second.size();
- for (ui32 blockId : xpair.second)
- PrivatePageCache->Touch(blockId, pageCollectionInfo);
- }
+ size_t touchedBlocks = PrivatePageCache->GetStats().CurrentCacheHits;
Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_TOUCHED_BLOCKS].IncrementFor(touchedBlocks);
if (AppTxCounters && txType != UnknownTxType)
AppTxCounters->TxCumulative(txType, COUNTER_TT_TOUCHED_BLOCKS).Increment(touchedBlocks);
@@ -1862,6 +1823,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
}
UnpinTransactionPages(*seat);
+
Memory->ReleaseMemory(*seat);
const double currentBookkeepingTime = seat->CPUBookkeepingTime;
@@ -4273,16 +4235,18 @@ ui64 TExecutor::BeginRead(THolder<NTable::ICompactionRead> read)
{
Counters->Simple()[TExecutorCounters::COMPACTION_READ_IN_FLY] = CompactionReads.size() + 1;
+ PrivatePageCache->ResetTouchesAndToLoad(true);
TPageCollectionReadEnv env(*PrivatePageCache);
bool finished = read->Execute(&env);
- if (env.CacheHits) {
+ if (PrivatePageCache->GetStats().CurrentCacheHits) {
// Cache hits are only counted when read is first executed
- Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_CACHE_HITS].Increment(env.CacheHits);
+ Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_CACHE_HITS].Increment(PrivatePageCache->GetStats().CurrentCacheHits);
}
if (finished) {
// Optimize for successful read completion
+ PrivatePageCache->ResetTouchesAndToLoad(false);
Counters->Simple()[TExecutorCounters::COMPACTION_READ_IN_FLY] = CompactionReads.size();
return 0;
}
@@ -4295,7 +4259,8 @@ ui64 TExecutor::BeginRead(THolder<NTable::ICompactionRead> read)
Y_VERIFY(r.second, "Cannot register a new read %" PRIu64, readId);
auto* state = &r.first->second;
- PostponeCompactionRead(state, &env);
+ PostponeCompactionRead(state);
+ PrivatePageCache->ResetTouchesAndToLoad(false);
return readId;
}
@@ -4321,43 +4286,14 @@ void TExecutor::RequestChanges(ui32 table)
PlanCompactionChangesActivation();
}
-void TExecutor::PostponeCompactionRead(TCompactionReadState* state, TPageCollectionReadEnv* env)
+void TExecutor::PostponeCompactionRead(TCompactionReadState* state)
{
- Y_VERIFY(env->ToLoad, "Compaction read postponed with nothing to load");
+ Y_VERIFY(PrivatePageCache->GetStats().CurrentCacheMisses, "Compaction read postponed with nothing to load");
size_t newPinnedPages = 0;
TCompactionReadState::TPinned pinned;
- auto pinTouched = [&](TPrivatePageCache::TInfo* pageCollectionInfo, const THashSet<ui32>& touched) {
- auto& newPinned = pinned[pageCollectionInfo->Id];
- if (auto* oldPinned = state->Pinned.FindPtr(pageCollectionInfo->Id)) {
- // We had previously pinned pages from this page collection
- // Create new or move used old pins to the new map
- for (auto pageId : touched) {
- if (auto it = oldPinned->find(pageId); it != oldPinned->end()) {
- Y_VERIFY_DEBUG(it->second);
- newPinned[pageId] = std::move(it->second);
- oldPinned->erase(it);
- } else {
- newPinned[pageId] = PrivatePageCache->Pin(pageId, pageCollectionInfo);
- newPinnedPages++;
- }
- }
- } else {
- for (auto pageId : touched) {
- newPinned[pageId] = PrivatePageCache->Pin(pageId, pageCollectionInfo);
- newPinnedPages++;
- }
- }
- };
-
- // Everything touched during this read iteration must be pinned
- for (auto& touch : env->Touches) {
- pinTouched(touch.first, touch.second);
- }
- for (auto& load : env->ToLoad) {
- pinTouched(load.first, load.second);
- }
+ PrivatePageCache->RepinPages(pinned, state->Pinned, newPinnedPages);
// Everything not touched during this read iteration must be unpinned
size_t unpinnedPages = UnpinCompactionReadPages(state);
@@ -4373,16 +4309,13 @@ void TExecutor::PostponeCompactionRead(TCompactionReadState* state, TPageCollect
ui32 loadPages = 0;
ui64 loadBytes = 0;
- for (auto& load : env->ToLoad) {
+ auto toLoad = PrivatePageCache->GetToLoad();
+ for (auto& load : toLoad) {
auto* pageCollectionInfo = load.first;
+ TVector<NTable::TPageId> &pages = load.second;
+ waitPages += pages.size();
- waitPages += load.second.size();
- TVector<NTable::TPageId> pages(Reserve(load.second.size()));
- for (auto pageId : load.second) {
- pages.push_back(pageId);
- }
-
- const std::pair<ui32, ui64> toLoad = PrivatePageCache->Load(pages, pad, pageCollectionInfo);
+ const std::pair<ui32, ui64> toLoad = PrivatePageCache->Request(pages, pad, pageCollectionInfo);
if (toLoad.first) {
auto* req = new NPageCollection::TFetch(0, pageCollectionInfo->PageCollection, std::move(pages));
@@ -4407,7 +4340,7 @@ void TExecutor::PostponeCompactionRead(TCompactionReadState* state, TPageCollect
Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_POSTPONED].Increment(1);
Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_LOAD_BYTES].Increment(loadBytes);
Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_LOAD_PAGES].Increment(loadPages);
- Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_CACHE_MISSES].Increment(env->CacheMisses);
+ Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_CACHE_MISSES].Increment(PrivatePageCache->GetStats().CurrentCacheMisses);
Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
@@ -4415,27 +4348,15 @@ void TExecutor::PostponeCompactionRead(TCompactionReadState* state, TPageCollect
size_t TExecutor::UnpinCompactionReadPages(TCompactionReadState* state)
{
- size_t unpinned = 0;
-
- for (auto& kv : state->Pinned) {
- if (auto* info = PrivatePageCache->Info(kv.first)) {
- for (auto& x : kv.second) {
- auto pageId = x.first;
- if (auto* pad = x.second.Get()) {
- x.second.Reset();
- PrivatePageCache->Unpin(pageId, pad, info);
- unpinned++;
- }
- }
- }
- }
+ size_t unpinnedPages = 0;
+ PrivatePageCache->UnpinPages(state->Pinned, unpinnedPages);
state->Pinned.clear();
Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
- return unpinned;
+ return unpinnedPages;
}
void TExecutor::PlanCompactionReadActivation()
@@ -4457,16 +4378,19 @@ void TExecutor::Handle(TEvPrivate::TEvActivateCompactionRead::TPtr& ev, const TA
CompactionReadQueue.pop_front();
if (auto* state = CompactionReads.FindPtr(readId)) {
state->Retries++;
+ PrivatePageCache->ResetTouchesAndToLoad(true);
TPageCollectionReadEnv env(*PrivatePageCache);
if (state->Read->Execute(&env)) {
// Optimize for successful read completion
UnpinCompactionReadPages(state);
+ PrivatePageCache->ResetTouchesAndToLoad(false);
CompactionReads.erase(readId);
Counters->Simple()[TExecutorCounters::COMPACTION_READ_IN_FLY] = CompactionReads.size();
continue;
}
- PostponeCompactionRead(state, &env);
+ PostponeCompactionRead(state);
+ PrivatePageCache->ResetTouchesAndToLoad(false);
}
}
}
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index 0dfe303ce49..26821dea7f3 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -598,7 +598,7 @@ class TExecutor
// Compaction read support
- void PostponeCompactionRead(TCompactionReadState* state, TPageCollectionReadEnv* env);
+ void PostponeCompactionRead(TCompactionReadState* state);
size_t UnpinCompactionReadPages(TCompactionReadState* state);
void PlanCompactionReadActivation();
void Handle(TEvPrivate::TEvActivateCompactionRead::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/tablet_flat/flat_executor_tx_env.h b/ydb/core/tablet_flat/flat_executor_tx_env.h
index 6288d5876ba..994f9fcc570 100644
--- a/ydb/core/tablet_flat/flat_executor_tx_env.h
+++ b/ydb/core/tablet_flat/flat_executor_tx_env.h
@@ -42,28 +42,11 @@ namespace NTabletFlatExecutor {
private:
const TSharedData* Lookup(TPrivatePageCache::TInfo *info, TPageId id) noexcept
{
- if (auto *page = Cache.Lookup(id, info)) {
- if (Touches[info].insert(id).second) {
- ++CacheHits;
- }
- return page;
- } else {
- if (ToLoad[info].insert(id).second) {
- ++CacheMisses;
- }
- return nullptr;
- }
+ return Cache.Lookup(id, info);
}
public:
TPrivatePageCache& Cache;
-
- /*_ Page collection cache pages load trace */
-
- THashMap<TPrivatePageCache::TInfo*, THashSet<ui32>> Touches;
- THashMap<TPrivatePageCache::TInfo*, THashSet<ui32>> ToLoad;
- size_t CacheHits = 0;
- size_t CacheMisses = 0;
};
struct TPageCollectionTxEnv : public TPageCollectionReadEnv, public IExecuting {
diff --git a/ydb/core/tablet_flat/flat_sausagecache.cpp b/ydb/core/tablet_flat/flat_sausagecache.cpp
index 350bfffcd98..9a9c834517e 100644
--- a/ydb/core/tablet_flat/flat_sausagecache.cpp
+++ b/ydb/core/tablet_flat/flat_sausagecache.cpp
@@ -6,10 +6,8 @@ namespace NTabletFlatExecutor {
TPrivatePageCache::TPage::TPage(ui32 size, ui32 pageId, TInfo* info)
: LoadState(LoadStateNo)
- , CacheGeneration(TCacheCacheConfig::CacheGenNone)
, Sticky(false)
, SharedPending(false)
- , Padding(0)
, Size(size)
, Id(pageId)
, Info(info)
@@ -43,8 +41,9 @@ TPrivatePageCache::TInfo::TInfo(const TInfo &info)
}
TPrivatePageCache::TPrivatePageCache(const TCacheCacheConfig &cacheConfig)
- : Cache(cacheConfig)
{
+ // todo: clean up
+ (void)cacheConfig;
}
void TPrivatePageCache::RegisterPageCollection(TIntrusivePtr<TInfo> info) {
@@ -55,35 +54,21 @@ void TPrivatePageCache::RegisterPageCollection(TIntrusivePtr<TInfo> info) {
for (const auto& kv : info->PageMap) {
auto* page = kv.second.Get();
Y_VERIFY_DEBUG(page);
- if (page->LoadState == TPage::LoadStateLoaded && !page->Sticky) {
- auto &x = ToTouchShared[page->Info->Id][page->Id];
- if (!page->SharedPending && !page->SharedBody && page->PinnedBody) {
- // We keep pinned body around until it's either
- // accepted or dropped by the shared cache
- page->SharedPending = true;
- x = page->PinnedBody;
- }
-
- if (!page->PinPad) {
- page->LoadState = TPage::LoadStateNo;
- if (!page->SharedPending && page->PinnedBody) {
- page->PinnedBody = { };
- }
- page->SharedBody.UnUse();
- }
-
- Y_VERIFY_DEBUG(!page->IsUnnecessary());
- }
+
if (page->SharedBody)
Stats.TotalSharedBody += page->Size;
if (page->PinnedBody)
Stats.TotalPinnedBody += page->Size;
if (page->PinnedBody && !page->SharedBody)
Stats.TotalExclusive += page->Size;
- if (page->SharedPending)
- Stats.TotalSharedPending += page->Size;
if (page->Sticky)
Stats.TotalSticky += page->Size;
+
+ Y_VERIFY_DEBUG(!page->SharedPending, "New page shouldn't be shared pending");
+ TryShareBody(page);
+
+ TryUnload(page);
+ Y_VERIFY_DEBUG(!page->IsUnnecessary());
}
++info->Users;
@@ -143,8 +128,6 @@ bool TPrivatePageCache::UnlockPageCollection(TLogoBlobID id) {
for (const auto& kv : info->PageMap) {
auto* page = kv.second.Get();
Y_VERIFY_DEBUG(page);
- if (page->CacheGeneration != TCacheCacheConfig::CacheGenNone)
- Cache.Evict(page);
Y_VERIFY(!page->WaitQueue, "non-empty wait queue in forgotten page.");
Y_VERIFY(!page->PinPad, "non-empty pin pad in forgotten page.");
@@ -170,10 +153,6 @@ bool TPrivatePageCache::UnlockPageCollection(TLogoBlobID id) {
return !info->Users;
}
-THashMap<TLogoBlobID, THashMap<ui32, TSharedData>> TPrivatePageCache::GetPrepareSharedTouched() {
- return std::move(ToTouchShared);
-}
-
TPrivatePageCache::TInfo* TPrivatePageCache::Info(TLogoBlobID id) {
auto *x = PageCollections.FindPtr(id);
if (x)
@@ -182,25 +161,6 @@ TPrivatePageCache::TInfo* TPrivatePageCache::Info(TLogoBlobID id) {
return nullptr;
}
-void TPrivatePageCache::Touch(TPage *page) {
- if (!page->Sticky) {
- switch (page->LoadState) {
- case TPage::LoadStateNo:
- return;
- case TPage::LoadStateLoaded:
- case TPage::LoadStateRequested:
- case TPage::LoadStateRequestedAsync:
- return Evict(Cache.Touch(page));
- }
- }
-}
-
-void TPrivatePageCache::Touch(ui32 pageId, TInfo *info) {
- if (auto* page = info->GetPage(pageId)) {
- Touch(page);
- }
-}
-
void TPrivatePageCache::MarkSticky(ui32 pageId, TInfo *collectionInfo) {
TPage *page = collectionInfo->EnsurePage(pageId);
if (Y_LIKELY(!page->Sticky)) {
@@ -208,24 +168,16 @@ void TPrivatePageCache::MarkSticky(ui32 pageId, TInfo *collectionInfo) {
// asynchronously later, so sticky pages may not be loaded yet.
page->Sticky = 1;
Stats.TotalSticky += page->Size;
-
- // Sticky pages are not expected to exist in the cache
- if (page->CacheGeneration != TCacheCacheConfig::CacheGenNone) {
- Cache.Evict(page);
- page->CacheGeneration = TCacheCacheConfig::CacheGenNone;
- }
}
}
-TIntrusivePtr<TPrivatePageCachePinPad> TPrivatePageCache::Pin(ui32 pageId, TInfo *info) {
- TPage *page = info->EnsurePage(pageId);
- if (!page->PinPad) {
+TIntrusivePtr<TPrivatePageCachePinPad> TPrivatePageCache::Pin(TPage *page) {
+ Y_VERIFY_DEBUG(page);
+ if (page && !page->PinPad) {
page->PinPad = new TPrivatePageCachePinPad();
Stats.PinnedSetSize += page->Size;
- // N.B. it's ok not to call Touch here, because pinned pages don't have
- // to be part of the cache even when they are loaded.
- Restore(page);
+ TryLoad(page);
if (page->LoadState != TPage::LoadStateLoaded)
Stats.PinnedLoadSize += page->Size;
@@ -234,8 +186,7 @@ TIntrusivePtr<TPrivatePageCachePinPad> TPrivatePageCache::Pin(ui32 pageId, TInfo
return page->PinPad;
}
-void TPrivatePageCache::Unpin(ui32 pageId, TPrivatePageCachePinPad *pad, TInfo *info) {
- TPage *page = info->GetPage(pageId);
+void TPrivatePageCache::Unpin(TPage *page, TPrivatePageCachePinPad *pad) {
if (page && page->PinPad.Get() == pad) {
if (page->PinPad.RefCount() == 1) {
page->PinPad.Drop();
@@ -243,27 +194,12 @@ void TPrivatePageCache::Unpin(ui32 pageId, TPrivatePageCachePinPad *pad, TInfo *
if (page->LoadState != TPage::LoadStateLoaded)
Stats.PinnedLoadSize -= page->Size;
- if (page->CacheGeneration != TCacheCacheConfig::CacheGenNone || page->Sticky)
- return;
-
- if (page->LoadState == TPage::LoadStateLoaded) {
- page->LoadState = TPage::LoadStateNo;
- if (!page->SharedPending) {
- if (Y_LIKELY(page->PinnedBody)) {
- Stats.TotalPinnedBody -= page->Size;
- if (!page->SharedBody) {
- Stats.TotalExclusive -= page->Size;
- }
- page->PinnedBody = { };
- }
- }
- page->SharedBody.UnUse();
- }
+ TryUnload(page);
}
}
}
-std::pair<ui32, ui64> TPrivatePageCache::Load(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info) {
+std::pair<ui32, ui64> TPrivatePageCache::Request(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info) {
ui32 blocksToRequest = 0;
ui64 bytesToRequest = 0;
@@ -307,10 +243,9 @@ std::pair<ui32, ui64> TPrivatePageCache::Load(TVector<ui32> &pages, TPrivatePage
return std::make_pair(blocksToRequest, bytesToRequest);
}
-bool TPrivatePageCache::Restore(TPage *page) {
- if (page->LoadState == TPage::LoadStateNo && page->SharedPending) {
- page->LoadState = TPage::LoadStateLoaded;
- return true;
+void TPrivatePageCache::TryLoad(TPage *page) {
+ if (page->LoadState == TPage::LoadStateLoaded) {
+ return;
}
if (page->LoadState == TPage::LoadStateNo && page->SharedBody) {
@@ -319,7 +254,7 @@ bool TPrivatePageCache::Restore(TPage *page) {
Stats.TotalPinnedBody += page->Size;
page->PinnedBody = TPinnedPageRef(page->SharedBody).GetData();
page->LoadState = TPage::LoadStateLoaded;
- return true;
+ return;
}
page->SharedBody.Drop();
@@ -328,32 +263,258 @@ bool TPrivatePageCache::Restore(TPage *page) {
Stats.TotalExclusive += page->Size;
}
}
+}
- return false;
+void TPrivatePageCache::TPrivatePageCache::TryUnload(TPage *page) {
+ if (page->LoadState == TPage::LoadStateLoaded) {
+ if (!page->SharedPending && !page->PinPad && !page->Sticky) {
+ ToTouchShared[page->Info->Id][page->Id];
+ page->LoadState = TPage::LoadStateNo;
+ if (Y_LIKELY(page->PinnedBody)) {
+ Stats.TotalPinnedBody -= page->Size;
+ if (!page->SharedBody) {
+ Stats.TotalExclusive -= page->Size;
+ }
+ page->PinnedBody = { };
+ }
+ page->SharedBody.UnUse();
+ }
+ }
+}
+
+// page may be made free after this call
+void TPrivatePageCache::TPrivatePageCache::TryEraseIfUnnecessary(TPage *page) {
+ if (page->IsUnnecessary()) {
+ if (Y_UNLIKELY(page->PinnedBody)) {
+ Stats.TotalPinnedBody -= page->Size;
+ Stats.TotalExclusive -= page->Size;
+ page->PinnedBody = { };
+ }
+ const ui32 pageId = page->Id;
+ auto* info = page->Info;
+ Y_VERIFY_DEBUG(info->PageMap[pageId].Get() == page);
+ Y_VERIFY(info->PageMap.erase(pageId));
+ }
+}
+
+void TPrivatePageCache::TPrivatePageCache::TryShareBody(TPage *page) {
+ if (page->LoadState == TPage::LoadStateLoaded) {
+ auto &x = ToTouchShared[page->Info->Id][page->Id];
+ if (!page->SharedPending && !page->SharedBody && page->PinnedBody) {
+ // We keep pinned body around until it's either
+ // accepted or dropped by the shared cache
+ page->SharedPending = true;
+ Stats.TotalSharedPending += page->Size;
+ x = page->PinnedBody;
+ }
+ }
}
const TSharedData* TPrivatePageCache::Lookup(ui32 pageId, TInfo *info) {
+ TPage *page = info->EnsurePage(pageId);
+
+ TryLoad(page);
+
+ if (page->LoadState == TPage::LoadStateLoaded) {
+ if (page->Empty()) {
+ Touches.PushBack(page);
+ Stats.CurrentCacheHits++;
+ }
+ return &page->PinnedBody;
+ }
+
+ if (page->Empty()) {
+ ToLoad.PushBack(page);
+ Stats.CurrentCacheMisses++;
+ }
+ return nullptr;
+}
+
+TSharedPageRef TPrivatePageCache::LookupShared(ui32 pageId, TInfo *info) {
TPage *page = info->GetPage(pageId);
if (!page)
return { };
- if (Restore(page)) {
- Touch(page);
+ if (page->LoadState == TPage::LoadStateLoaded) {
+ if (page->SharedBody) {
+ Y_VERIFY_DEBUG(page->SharedBody.IsUsed(), "Loaded page should have used body");
+ return page->SharedBody;
+ } else {
+ return TSharedPageRef::MakePrivate(page->PinnedBody);
+ }
+ }
+
+ if (page->LoadState == TPage::LoadStateNo) {
+ if (page->SharedBody) {
+ auto copy = page->SharedBody;
+ if (copy.Use()) {
+ return copy;
+ }
+
+ page->SharedBody.Drop();
+ Stats.TotalSharedBody -= page->Size;
+ if (Y_UNLIKELY(page->PinnedBody)) {
+ Stats.TotalExclusive += page->Size;
+ }
+ }
}
- return page->GetBody();
+ TryEraseIfUnnecessary(page);
+ return { };
}
-TSharedPageRef TPrivatePageCache::LookupShared(ui32 pageId, TInfo *info) {
+void TPrivatePageCache::PinTouches(TPinned &pinned, ui32 &touchedPages, ui32 &pinnedPages, ui64 &pinnedMemory) {
+ for (auto &page : Touches) {
+ auto &pinnedCollection = pinned[page.Info->Id];
+
+ // would insert only if first seen
+ if (pinnedCollection.insert(std::make_pair(page.Id, Pin(&page))).second) {
+ pinnedPages++;
+ pinnedMemory += page.Size;
+ }
+ touchedPages++;
+ }
+}
+
+void TPrivatePageCache::PinToLoad(TPinned &pinned, ui32 &pinnedPages, ui64 &pinnedMemory) {
+ for (auto &page : ToLoad) {
+ auto &pinnedCollection = pinned[page.Info->Id];
+
+ // would insert only if first seen
+ if (pinnedCollection.insert(std::make_pair(page.Id, Pin(&page))).second) {
+ pinnedPages++;
+ pinnedMemory += page.Size;
+ }
+ }
+}
+
+void TPrivatePageCache::RepinPages(TPinned &newPinned, TPinned &oldPinned, size_t &pinnedPages) {
+ auto repinTouched = [&](TPage* page) {
+ auto& newPinnedCollection = newPinned[page->Info->Id];
+
+ if (auto* oldPinnedCollection = oldPinned.FindPtr(page->Info->Id)) {
+ // We had previously pinned pages from this page collection
+ // Create new or move used old pins to the new map
+ if (auto it = oldPinnedCollection->find(page->Id); it != oldPinnedCollection->end()) {
+ Y_VERIFY_DEBUG(it->second);
+ newPinnedCollection[page->Id] = std::move(it->second);
+ oldPinnedCollection->erase(it);
+ } else {
+ newPinnedCollection[page->Id] = Pin(page);
+ pinnedPages++;
+ }
+ } else {
+ newPinnedCollection[page->Id] = Pin(page);
+ pinnedPages++;
+ }
+ };
+
+ // Everything touched during this read iteration must be pinned
+ for (auto& page : Touches) {
+ repinTouched(&page);
+ }
+ for (auto& page : ToLoad) {
+ repinTouched(&page);
+ }
+}
+
+void TPrivatePageCache::UnpinPages(TPinned &pinned, size_t &unpinnedPages) {
+ for (auto &xinfoid : pinned) {
+ if (TPrivatePageCache::TInfo *info = Info(xinfoid.first)) {
+ for (auto &x : xinfoid.second) {
+ ui32 pageId = x.first;
+ TPrivatePageCachePinPad *pad = x.second.Get();
+ x.second.Reset();
+ TPage *page = info->GetPage(pageId);
+ Unpin(page, pad);
+ unpinnedPages++;
+ }
+ }
+ }
+}
+
+// todo: do we really need that groupping by page collection?
+THashMap<TPrivatePageCache::TInfo*, TVector<ui32>> TPrivatePageCache::GetToLoad() const {
+ THashMap<TPrivatePageCache::TInfo*, TVector<ui32>> result;
+ for (auto &page : ToLoad) {
+ result[page.Info].push_back(page.Id);
+ }
+ return result;
+}
+
+void TPrivatePageCache::ResetTouchesAndToLoad(bool verifyEmpty) {
+ if (verifyEmpty) {
+ Y_VERIFY(!Touches);
+ Y_VERIFY(!Stats.CurrentCacheHits);
+ Y_VERIFY(!ToLoad);
+ Y_VERIFY(!Stats.CurrentCacheMisses);
+ }
+
+ while (Touches) {
+ TPage *page = Touches.PopBack();
+ TryUnload(page);
+ }
+ Stats.CurrentCacheHits = 0;
+
+ while (ToLoad) {
+ TPage *page = ToLoad.PopBack();
+ TryEraseIfUnnecessary(page);
+ }
+ Stats.CurrentCacheMisses = 0;
+}
+
+void TPrivatePageCache::UpdateSharedBody(TInfo *info, ui32 pageId, TSharedPageRef shared) {
TPage *page = info->GetPage(pageId);
if (!page)
- return { };
+ return;
+
+ Y_VERIFY_DEBUG(page->SharedPending, "Shared cache accepted a page that is not pending, possible bug");
+ if (Y_UNLIKELY(!page->SharedPending)) {
+ return;
+ }
+ Y_VERIFY_DEBUG(page->LoadState == TPage::LoadStateLoaded, "Shared pending page should be loaded");
+
+ // Shared cache accepted our page and provided its shared reference
+ Stats.TotalSharedPending -= page->Size;
+ page->SharedPending = false;
+ if (Y_LIKELY(!page->SharedBody)) {
+ Stats.TotalSharedBody += page->Size;
+ if (Y_LIKELY(page->PinnedBody)) {
+ Stats.TotalExclusive -= page->Size;
+ }
+ }
+ page->SharedBody = std::move(shared);
+ TryUnload(page);
+}
+
+void TPrivatePageCache::DropSharedBody(TInfo *info, ui32 pageId) {
+ TPage *page = info->GetPage(pageId);
+ if (!page)
+ return;
- if (Restore(page)) {
- Touch(page);
+ Y_VERIFY_DEBUG(!page->SharedPending, "Shared cache dropped page sharing request, possible bug");
+ if (Y_UNLIKELY(page->SharedPending)) {
+ // Shared cache rejected our page so we should drop it too
+ Stats.TotalSharedPending -= page->Size;
+ page->SharedPending = false;
+ TryUnload(page);
}
- return page->GetShared();
+ if (!page->SharedBody.IsUsed()) {
+ if (Y_LIKELY(page->SharedBody)) {
+ Stats.TotalSharedBody -= page->Size;
+ if (Y_UNLIKELY(page->PinnedBody)) {
+ Stats.TotalExclusive += page->Size;
+ }
+ page->SharedBody = { };
+ }
+ TryEraseIfUnnecessary(page);
+ }
+}
+
+void TPrivatePageCache::UpdateCacheSize(ui64 cacheSize) {
+ // todo: clean up
+ (void)cacheSize;
}
TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ProvideBlock(
@@ -376,9 +537,9 @@ TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ProvideBlock(
// N.B. we must be careful not to accidentally drop the sticky bit
page->Fill(std::move(loaded.Page), page->Sticky);
-
Stats.TotalSharedBody += page->Size;
Stats.TotalPinnedBody += page->Size;
+ TryUnload(page);
TPage::TWaitQueuePtr ret;
if (page->WaitQueue) {
@@ -392,19 +553,9 @@ TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ProvideBlock(
page->WaitQueue.Destroy();
}
- if (page->CacheGeneration == TCacheCacheConfig::CacheGenNone
- && !page->Sticky && !page->PinPad)
- { // should be cache refresh by provided profile
- Touch(page);
- }
-
return ret;
}
-void TPrivatePageCache::UpdateCacheSize(ui64 cacheSize) {
- Cache.UpdateCacheSize(cacheSize);
-}
-
THashMap<TLogoBlobID, TIntrusivePtr<TPrivatePageCache::TInfo>> TPrivatePageCache::DetachPrivatePageCache() {
THashMap<TLogoBlobID, TIntrusivePtr<TPrivatePageCache::TInfo>> ret;
@@ -416,74 +567,8 @@ THashMap<TLogoBlobID, TIntrusivePtr<TPrivatePageCache::TInfo>> TPrivatePageCache
return ret;
}
-void TPrivatePageCache::Evict(TPage *pages) {
- if (pages == nullptr)
- return;
-
- TPage *page = pages;
- for (;;) {
- Y_VERIFY(page->CacheGeneration == TCacheCacheConfig::CacheGenEvicted, "evicting non-evicted page with gen %" PRIu32, (ui32)page->CacheGeneration);
- page->CacheGeneration = TCacheCacheConfig::CacheGenNone;
-
- Y_VERIFY(!page->Sticky, "Unexpected sticky page evicted from cache");
-
- switch (page->LoadState) {
- case TPage::LoadStateNo:
- case TPage::LoadStateRequested:
- case TPage::LoadStateRequestedAsync:
- break;
- case TPage::LoadStateLoaded: {
- auto &x = ToTouchShared[page->Info->Id][page->Id];
- if (!page->SharedPending && !page->SharedBody && page->PinnedBody) {
- // We keep pinned body around until it's either
- // accepted or dropped by the shared cache
- Stats.TotalSharedPending += page->Size;
- page->SharedPending = true;
- x = page->PinnedBody;
- }
-
- if (!page->PinPad) {
- page->LoadState = TPage::LoadStateNo;
- if (!page->SharedPending) {
- if (Y_LIKELY(page->PinnedBody)) {
- Stats.TotalPinnedBody -= page->Size;
- if (!page->SharedBody) {
- Stats.TotalExclusive -= page->Size;
- }
- page->PinnedBody = { };
- }
- }
- page->SharedBody.UnUse();
- }
-
- break;
- }
- default:
- Y_FAIL("unknown load state");
- }
-
- TPage *next = page->Next()->Node();
- if (page != next) {
- page->Unlink();
- }
-
- if (page->IsUnnecessary()) {
- if (Y_UNLIKELY(page->PinnedBody)) {
- Stats.TotalPinnedBody -= page->Size;
- Stats.TotalExclusive -= page->Size;
- page->PinnedBody = { };
- }
- const ui32 pageId = page->Id;
- auto* info = page->Info;
- Y_VERIFY_DEBUG(info->PageMap[pageId].Get() == page);
- Y_VERIFY(info->PageMap.erase(pageId));
- }
-
- if (page == next)
- break;
-
- page = next;
- }
+THashMap<TLogoBlobID, THashMap<ui32, TSharedData>> TPrivatePageCache::GetPrepareSharedTouched() {
+ return std::move(ToTouchShared);
}
}}
diff --git a/ydb/core/tablet_flat/flat_sausagecache.h b/ydb/core/tablet_flat/flat_sausagecache.h
index 4dd2c2895a9..f09852ca025 100644
--- a/ydb/core/tablet_flat/flat_sausagecache.h
+++ b/ydb/core/tablet_flat/flat_sausagecache.h
@@ -19,6 +19,8 @@ struct TPrivatePageCacheWaitPad : public TExplicitSimpleCounter {
};
class TPrivatePageCache {
+ using TPinned = THashMap<TLogoBlobID, THashMap<ui32, TIntrusivePtr<TPrivatePageCachePinPad>>>;
+
public:
struct TInfo;
@@ -31,6 +33,8 @@ public:
ui64 TotalSticky = 0; // total number of bytes marked as sticky (never unloaded from memory)
ui64 PinnedSetSize = 0; // number of bytes pinned by transactions (even those not currently loaded)
ui64 PinnedLoadSize = 0; // number of bytes pinned by transactions (which are currently being loaded)
+ size_t CurrentCacheHits = 0; // = Touches.Size()
+ size_t CurrentCacheMisses = 0; // = ToLoad.Size()
};
struct TPage : public TIntrusiveListItem<TPage> {
@@ -45,10 +49,9 @@ public:
};
ui64 LoadState : 2;
- ui64 CacheGeneration : 3;
ui64 Sticky : 1;
ui64 SharedPending : 1;
- ui64 Padding : 1;
+ ui64 : 4; // padding
const ui64 Size : 24;
const ui64 Id : 32;
@@ -83,32 +86,14 @@ public:
void Fill(TSharedPageRef shared, bool sticky = false) {
Sticky = sticky;
SharedPending = false;
- LoadState = LoadStateLoaded;
SharedBody = std::move(shared);
+ LoadState = LoadStateLoaded;
PinnedBody = TPinnedPageRef(SharedBody).GetData();
}
- const TSharedData* GetBody() const noexcept {
+ const TSharedData* GetPinnedBody() const noexcept {
return LoadState == LoadStateLoaded ? &PinnedBody : nullptr;
}
-
- TSharedPageRef GetShared() const noexcept {
- if (LoadState == LoadStateLoaded) {
- if (SharedBody) {
- return SharedBody;
- } else {
- return TSharedPageRef::MakePrivate(PinnedBody);
- }
- }
-
- return { };
- }
-
- struct TWeight {
- static ui64 Get(TPage *x) {
- return x->Size;
- }
- };
};
struct TInfo : public TThrRefBase {
@@ -118,7 +103,7 @@ public:
const TSharedData* Lookup(ui32 pageId) const noexcept {
auto* page = GetPage(pageId);
- return page ? page->GetBody() : nullptr;
+ return page ? page->GetPinnedBody() : nullptr;
}
TPage* GetPage(ui32 pageId) const noexcept {
@@ -141,72 +126,6 @@ public:
EnsurePage(loaded.PageId)->Fill(std::move(loaded.Page), sticky);
}
- void UpdateSharedBody(ui32 pageId, TSharedPageRef shared, TStats *stats) noexcept {
- auto* page = GetPage(pageId);
- if (!page) {
- return;
- }
- Y_VERIFY_DEBUG(page->SharedPending, "Shared cache accepted a page that is not pending, possible bug");
- if (Y_UNLIKELY(!page->SharedPending)) {
- return;
- }
- // Shared cache accepted our page and provided its shared reference
- stats->TotalSharedPending -= page->Size;
- page->SharedPending = false;
- if (Y_LIKELY(!page->SharedBody)) {
- stats->TotalSharedBody += page->Size;
- if (Y_LIKELY(page->PinnedBody)) {
- stats->TotalExclusive -= page->Size;
- }
- }
- page->SharedBody = std::move(shared);
- if (page->LoadState == TPage::LoadStateLoaded) {
- if (Y_UNLIKELY(!page->PinnedBody))
- stats->TotalPinnedBody += page->Size;
- page->PinnedBody = TPinnedPageRef(page->SharedBody).GetData();
- } else {
- if (Y_LIKELY(page->PinnedBody))
- stats->TotalPinnedBody -= page->Size;
- page->PinnedBody = { };
- page->SharedBody.UnUse();
- }
- }
-
- void DropSharedBody(ui32 pageId, TStats *stats) noexcept {
- if (auto* page = GetPage(pageId)) {
- Y_VERIFY_DEBUG(!page->SharedPending, "Shared cache dropped page sharing request, possible bug");
- if (Y_UNLIKELY(page->SharedPending)) {
- // Shared cache rejected our page so we should drop it too
- stats->TotalSharedPending -= page->Size;
- page->SharedPending = false;
- if (page->LoadState != TPage::LoadStateLoaded) {
- if (Y_LIKELY(page->PinnedBody)) {
- stats->TotalPinnedBody -= page->Size;
- stats->TotalExclusive -= page->Size;
- page->PinnedBody = { };
- }
- }
- }
- if (!page->SharedBody.IsUsed()) {
- if (Y_LIKELY(page->SharedBody)) {
- stats->TotalSharedBody -= page->Size;
- if (Y_UNLIKELY(page->PinnedBody)) {
- stats->TotalExclusive += page->Size;
- }
- page->SharedBody = { };
- }
- if (page->IsUnnecessary()) {
- if (Y_UNLIKELY(page->PinnedBody)) {
- stats->TotalPinnedBody -= page->Size;
- stats->TotalExclusive -= page->Size;
- page->PinnedBody = { };
- }
- PageMap.erase(pageId);
- }
- }
- }
- }
-
const TLogoBlobID Id;
const TIntrusiveConstPtr<NPageCollection::IPageCollection> PageCollection;
TPageMap<THolder<TPage>> PageMap;
@@ -228,28 +147,27 @@ public:
TInfo* Info(TLogoBlobID id);
- void Touch(ui32 page, TInfo *collectionInfo);
- TIntrusivePtr<TPrivatePageCachePinPad> Pin(ui32 page, TInfo *collectionInfo);
- void Unpin(ui32 page, TPrivatePageCachePinPad *pad, TInfo *collectionInfo);
void MarkSticky(ui32 pageId, TInfo *collectionInfo);
const TStats& GetStats() const { return Stats; }
- void UpdateSharedBody(TInfo *collectionInfo, ui32 pageId, TSharedPageRef shared) noexcept {
- collectionInfo->UpdateSharedBody(pageId, std::move(shared), &Stats);
- }
-
- void DropSharedBody(TInfo *collectionInfo, ui32 pageId) noexcept {
- collectionInfo->DropSharedBody(pageId, &Stats);
- }
-
- std::pair<ui32, ui64> Load(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info); // blocks to load, bytes to load
+ std::pair<ui32, ui64> Request(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info); // blocks to load, bytes to load
const TSharedData* Lookup(ui32 page, TInfo *collection);
TSharedPageRef LookupShared(ui32 page, TInfo *collection);
- TPage::TWaitQueuePtr ProvideBlock(NSharedCache::TEvResult::TLoaded&& loaded, TInfo *collectionInfo);
+
+ void PinTouches(TPinned &pinned, ui32 &touchedPages, ui32 &pinnedPages, ui64 &pinnedMemory);
+ void PinToLoad(TPinned &pinned, ui32 &pinnedPages, ui64 &pinnedMemory);
+ void RepinPages(TPinned &newPinned, TPinned &oldPinned, size_t &pinnedPages);
+ void UnpinPages(TPinned &pinned, size_t &unpinnedPages);
+ THashMap<TPrivatePageCache::TInfo*, TVector<ui32>> GetToLoad() const;
+ void ResetTouchesAndToLoad(bool verifyEmpty);
+
+ void UpdateSharedBody(TInfo *collectionInfo, ui32 pageId, TSharedPageRef shared);
+ void DropSharedBody(TInfo *collectionInfo, ui32 pageId);
void UpdateCacheSize(ui64 cacheSize);
+ TPage::TWaitQueuePtr ProvideBlock(NSharedCache::TEvResult::TLoaded&& loaded, TInfo *collectionInfo);
THashMap<TLogoBlobID, TIntrusivePtr<TInfo>> DetachPrivatePageCache();
THashMap<TLogoBlobID, THashMap<ui32, TSharedData>> GetPrepareSharedTouched();
private:
@@ -257,11 +175,17 @@ private:
THashMap<TLogoBlobID, THashMap<ui32, TSharedData>> ToTouchShared;
TStats Stats;
- TCacheCache<TPage, TPage::TWeight> Cache;
- bool Restore(TPage *page);
- void Touch(TPage *page);
- void Evict(TPage *pages);
+ TIntrusiveList<TPage> Touches;
+ TIntrusiveList<TPage> ToLoad;
+
+ TIntrusivePtr<TPrivatePageCachePinPad> Pin(TPage *page);
+ void Unpin(TPage *page, TPrivatePageCachePinPad *pad);
+
+ void TryLoad(TPage *page);
+ void TryUnload(TPage *page);
+ void TryEraseIfUnnecessary(TPage *page);
+ void TryShareBody(TPage *page);
};
}}