diff options
author | kungasc <kungasc@yandex-team.com> | 2023-06-08 20:12:14 +0300 |
---|---|---|
committer | kungasc <kungasc@yandex-team.com> | 2023-06-08 20:12:14 +0300 |
commit | fbe8270e3157859963c28daa1b4f7750ded631fa (patch) | |
tree | 9b6da87668b95573e00d920c80093c222c2da469 | |
parent | c97fba6c6c589c28d5801ca15288b7cf1e8eac73 (diff) | |
download | ydb-fbe8270e3157859963c28daa1b4f7750ded631fa.tar.gz |
Simple private cache
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.cpp | 152 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.h | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_tx_env.h | 19 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_sausagecache.cpp | 431 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_sausagecache.h | 136 |
5 files changed, 328 insertions, 412 deletions
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index fd9fe16404f..8e2a8ff93d8 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1553,6 +1553,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct THPTimer cpuTimer; + PrivatePageCache->ResetTouchesAndToLoad(true); TPageCollectionTxEnv env(*Database, *PrivatePageCache); TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId); @@ -1619,28 +1620,21 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct CommitTransactionLog(seat, env, prod.Change, cpuTimer, ctx); } else { Y_VERIFY(!seat->CapturedMemory); - if (!env.ToLoad && !seat->RequestedMemory && !txc.IsRescheduled()) { + if (!PrivatePageCache->GetStats().CurrentCacheMisses && !seat->RequestedMemory && !txc.IsRescheduled()) { Y_Fail(NFmt::Do(*this) << " " << NFmt::Do(*seat) << " type " << NFmt::Do(*seat->Self) << " postoned w/o demands"); } PostponeTransaction(seat, env, prod.Change, cpuTimer, ctx); } + PrivatePageCache->ResetTouchesAndToLoad(false); ActiveTransaction = false; PlanTransactionActivation(); } void TExecutor::UnpinTransactionPages(TSeat &seat) { - for (auto &xinfoid : seat.Pinned) { - if (TPrivatePageCache::TInfo *info = PrivatePageCache->Info(xinfoid.first)) { - for (auto &x : xinfoid.second) { - ui32 pageId = x.first; - TPrivatePageCachePinPad *pad = x.second.Get(); - x.second.Reset(); - PrivatePageCache->Unpin(pageId, pad, info); - } - } - } + size_t unpinnedPages = 0; + PrivatePageCache->UnpinPages(seat.Pinned, unpinnedPages); seat.Pinned.clear(); seat.MemoryTouched = 0; @@ -1672,41 +1666,19 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv & TTxType txType = seat->Self->GetTxType(); ui32 touchedPages = 0; - ui32 touchedBytes = 0; + ui64 touchedBytes = 0; ui32 newPinnedPages = 0; ui32 waitPages = 0; ui32 loadPages = 0; ui64 loadBytes = 0; ui64 prevTouched = seat->MemoryTouched; - // must pin new entries - for (auto &xpair : env.Touches) { - TPrivatePageCache::TInfo *pageCollectionInfo = xpair.first; - auto &pinned = seat->Pinned[pageCollectionInfo->Id]; - for (auto &x : xpair.second) { - // would insert only if first seen - if (pinned.insert(std::make_pair(x, PrivatePageCache->Pin(x, pageCollectionInfo))).second) { - ++newPinnedPages; - seat->MemoryTouched += pageCollectionInfo->GetPage(x)->Size; - } - } - touchedPages += xpair.second.size(); - } + PrivatePageCache->PinTouches(seat->Pinned, touchedPages, newPinnedPages, seat->MemoryTouched); touchedBytes = seat->MemoryTouched - prevTouched; prevTouched = seat->MemoryTouched; - for (auto &xpair : env.ToLoad) { - TPrivatePageCache::TInfo *pageCollectionInfo = xpair.first; - auto &pinned = seat->Pinned[pageCollectionInfo->Id]; - - for (auto &x : xpair.second) { - if (pinned.insert(std::make_pair(x, PrivatePageCache->Pin(x, pageCollectionInfo))).second) { - ++newPinnedPages; - seat->MemoryTouched += pageCollectionInfo->GetPage(x)->Size; - } - } - } + PrivatePageCache->PinToLoad(seat->Pinned, newPinnedPages, seat->MemoryTouched); if (seat->AttachedMemory) Memory->AttachMemory(*seat); @@ -1776,7 +1748,7 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv & // If memory was allocated and there is nothing to load // then tx may be re-activated. - if (!env.ToLoad) { + if (!PrivatePageCache->GetStats().CurrentCacheMisses) { LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; @@ -1789,18 +1761,13 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv & auto *const pad = padHolder.Get(); TransactionWaitPads[pad] = std::move(padHolder); - for (auto &xpair : env.ToLoad) { + auto toLoad = PrivatePageCache->GetToLoad(); + for (auto &xpair : toLoad) { TPrivatePageCache::TInfo *pageCollectionInfo = xpair.first; + TVector<NTable::TPageId> &pages = xpair.second; + waitPages += pages.size(); - TVector<NTable::TPageId> pages; - pages.reserve(xpair.second.size()); - - waitPages += xpair.second.size(); - for (auto &x : xpair.second) { - pages.push_back(x); - } - - const std::pair<ui32, ui64> toLoad = PrivatePageCache->Load(pages, pad, pageCollectionInfo); + const std::pair<ui32, ui64> toLoad = PrivatePageCache->Request(pages, pad, pageCollectionInfo); if (toLoad.first) { auto *req = new NPageCollection::TFetch(0, pageCollectionInfo->PageCollection, std::move(pages)); @@ -1846,13 +1813,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv const bool isTerminated = seat->TerminationReason != ETerminationReason::None; const TTxType txType = seat->Self->GetTxType(); - ui64 touchedBlocks = 0; - for (auto &xpair : env.Touches) { - TPrivatePageCache::TInfo *pageCollectionInfo = xpair.first; - touchedBlocks += xpair.second.size(); - for (ui32 blockId : xpair.second) - PrivatePageCache->Touch(blockId, pageCollectionInfo); - } + size_t touchedBlocks = PrivatePageCache->GetStats().CurrentCacheHits; Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_TOUCHED_BLOCKS].IncrementFor(touchedBlocks); if (AppTxCounters && txType != UnknownTxType) AppTxCounters->TxCumulative(txType, COUNTER_TT_TOUCHED_BLOCKS).Increment(touchedBlocks); @@ -1862,6 +1823,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv } UnpinTransactionPages(*seat); + Memory->ReleaseMemory(*seat); const double currentBookkeepingTime = seat->CPUBookkeepingTime; @@ -4273,16 +4235,18 @@ ui64 TExecutor::BeginRead(THolder<NTable::ICompactionRead> read) { Counters->Simple()[TExecutorCounters::COMPACTION_READ_IN_FLY] = CompactionReads.size() + 1; + PrivatePageCache->ResetTouchesAndToLoad(true); TPageCollectionReadEnv env(*PrivatePageCache); bool finished = read->Execute(&env); - if (env.CacheHits) { + if (PrivatePageCache->GetStats().CurrentCacheHits) { // Cache hits are only counted when read is first executed - Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_CACHE_HITS].Increment(env.CacheHits); + Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_CACHE_HITS].Increment(PrivatePageCache->GetStats().CurrentCacheHits); } if (finished) { // Optimize for successful read completion + PrivatePageCache->ResetTouchesAndToLoad(false); Counters->Simple()[TExecutorCounters::COMPACTION_READ_IN_FLY] = CompactionReads.size(); return 0; } @@ -4295,7 +4259,8 @@ ui64 TExecutor::BeginRead(THolder<NTable::ICompactionRead> read) Y_VERIFY(r.second, "Cannot register a new read %" PRIu64, readId); auto* state = &r.first->second; - PostponeCompactionRead(state, &env); + PostponeCompactionRead(state); + PrivatePageCache->ResetTouchesAndToLoad(false); return readId; } @@ -4321,43 +4286,14 @@ void TExecutor::RequestChanges(ui32 table) PlanCompactionChangesActivation(); } -void TExecutor::PostponeCompactionRead(TCompactionReadState* state, TPageCollectionReadEnv* env) +void TExecutor::PostponeCompactionRead(TCompactionReadState* state) { - Y_VERIFY(env->ToLoad, "Compaction read postponed with nothing to load"); + Y_VERIFY(PrivatePageCache->GetStats().CurrentCacheMisses, "Compaction read postponed with nothing to load"); size_t newPinnedPages = 0; TCompactionReadState::TPinned pinned; - auto pinTouched = [&](TPrivatePageCache::TInfo* pageCollectionInfo, const THashSet<ui32>& touched) { - auto& newPinned = pinned[pageCollectionInfo->Id]; - if (auto* oldPinned = state->Pinned.FindPtr(pageCollectionInfo->Id)) { - // We had previously pinned pages from this page collection - // Create new or move used old pins to the new map - for (auto pageId : touched) { - if (auto it = oldPinned->find(pageId); it != oldPinned->end()) { - Y_VERIFY_DEBUG(it->second); - newPinned[pageId] = std::move(it->second); - oldPinned->erase(it); - } else { - newPinned[pageId] = PrivatePageCache->Pin(pageId, pageCollectionInfo); - newPinnedPages++; - } - } - } else { - for (auto pageId : touched) { - newPinned[pageId] = PrivatePageCache->Pin(pageId, pageCollectionInfo); - newPinnedPages++; - } - } - }; - - // Everything touched during this read iteration must be pinned - for (auto& touch : env->Touches) { - pinTouched(touch.first, touch.second); - } - for (auto& load : env->ToLoad) { - pinTouched(load.first, load.second); - } + PrivatePageCache->RepinPages(pinned, state->Pinned, newPinnedPages); // Everything not touched during this read iteration must be unpinned size_t unpinnedPages = UnpinCompactionReadPages(state); @@ -4373,16 +4309,13 @@ void TExecutor::PostponeCompactionRead(TCompactionReadState* state, TPageCollect ui32 loadPages = 0; ui64 loadBytes = 0; - for (auto& load : env->ToLoad) { + auto toLoad = PrivatePageCache->GetToLoad(); + for (auto& load : toLoad) { auto* pageCollectionInfo = load.first; + TVector<NTable::TPageId> &pages = load.second; + waitPages += pages.size(); - waitPages += load.second.size(); - TVector<NTable::TPageId> pages(Reserve(load.second.size())); - for (auto pageId : load.second) { - pages.push_back(pageId); - } - - const std::pair<ui32, ui64> toLoad = PrivatePageCache->Load(pages, pad, pageCollectionInfo); + const std::pair<ui32, ui64> toLoad = PrivatePageCache->Request(pages, pad, pageCollectionInfo); if (toLoad.first) { auto* req = new NPageCollection::TFetch(0, pageCollectionInfo->PageCollection, std::move(pages)); @@ -4407,7 +4340,7 @@ void TExecutor::PostponeCompactionRead(TCompactionReadState* state, TPageCollect Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_POSTPONED].Increment(1); Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_LOAD_BYTES].Increment(loadBytes); Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_LOAD_PAGES].Increment(loadPages); - Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_CACHE_MISSES].Increment(env->CacheMisses); + Counters->Cumulative()[TExecutorCounters::COMPACTION_READ_CACHE_MISSES].Increment(PrivatePageCache->GetStats().CurrentCacheMisses); Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize; Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize; @@ -4415,27 +4348,15 @@ void TExecutor::PostponeCompactionRead(TCompactionReadState* state, TPageCollect size_t TExecutor::UnpinCompactionReadPages(TCompactionReadState* state) { - size_t unpinned = 0; - - for (auto& kv : state->Pinned) { - if (auto* info = PrivatePageCache->Info(kv.first)) { - for (auto& x : kv.second) { - auto pageId = x.first; - if (auto* pad = x.second.Get()) { - x.second.Reset(); - PrivatePageCache->Unpin(pageId, pad, info); - unpinned++; - } - } - } - } + size_t unpinnedPages = 0; + PrivatePageCache->UnpinPages(state->Pinned, unpinnedPages); state->Pinned.clear(); Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize; Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize; - return unpinned; + return unpinnedPages; } void TExecutor::PlanCompactionReadActivation() @@ -4457,16 +4378,19 @@ void TExecutor::Handle(TEvPrivate::TEvActivateCompactionRead::TPtr& ev, const TA CompactionReadQueue.pop_front(); if (auto* state = CompactionReads.FindPtr(readId)) { state->Retries++; + PrivatePageCache->ResetTouchesAndToLoad(true); TPageCollectionReadEnv env(*PrivatePageCache); if (state->Read->Execute(&env)) { // Optimize for successful read completion UnpinCompactionReadPages(state); + PrivatePageCache->ResetTouchesAndToLoad(false); CompactionReads.erase(readId); Counters->Simple()[TExecutorCounters::COMPACTION_READ_IN_FLY] = CompactionReads.size(); continue; } - PostponeCompactionRead(state, &env); + PostponeCompactionRead(state); + PrivatePageCache->ResetTouchesAndToLoad(false); } } } diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 0dfe303ce49..26821dea7f3 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -598,7 +598,7 @@ class TExecutor // Compaction read support - void PostponeCompactionRead(TCompactionReadState* state, TPageCollectionReadEnv* env); + void PostponeCompactionRead(TCompactionReadState* state); size_t UnpinCompactionReadPages(TCompactionReadState* state); void PlanCompactionReadActivation(); void Handle(TEvPrivate::TEvActivateCompactionRead::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tablet_flat/flat_executor_tx_env.h b/ydb/core/tablet_flat/flat_executor_tx_env.h index 6288d5876ba..994f9fcc570 100644 --- a/ydb/core/tablet_flat/flat_executor_tx_env.h +++ b/ydb/core/tablet_flat/flat_executor_tx_env.h @@ -42,28 +42,11 @@ namespace NTabletFlatExecutor { private: const TSharedData* Lookup(TPrivatePageCache::TInfo *info, TPageId id) noexcept { - if (auto *page = Cache.Lookup(id, info)) { - if (Touches[info].insert(id).second) { - ++CacheHits; - } - return page; - } else { - if (ToLoad[info].insert(id).second) { - ++CacheMisses; - } - return nullptr; - } + return Cache.Lookup(id, info); } public: TPrivatePageCache& Cache; - - /*_ Page collection cache pages load trace */ - - THashMap<TPrivatePageCache::TInfo*, THashSet<ui32>> Touches; - THashMap<TPrivatePageCache::TInfo*, THashSet<ui32>> ToLoad; - size_t CacheHits = 0; - size_t CacheMisses = 0; }; struct TPageCollectionTxEnv : public TPageCollectionReadEnv, public IExecuting { diff --git a/ydb/core/tablet_flat/flat_sausagecache.cpp b/ydb/core/tablet_flat/flat_sausagecache.cpp index 350bfffcd98..9a9c834517e 100644 --- a/ydb/core/tablet_flat/flat_sausagecache.cpp +++ b/ydb/core/tablet_flat/flat_sausagecache.cpp @@ -6,10 +6,8 @@ namespace NTabletFlatExecutor { TPrivatePageCache::TPage::TPage(ui32 size, ui32 pageId, TInfo* info) : LoadState(LoadStateNo) - , CacheGeneration(TCacheCacheConfig::CacheGenNone) , Sticky(false) , SharedPending(false) - , Padding(0) , Size(size) , Id(pageId) , Info(info) @@ -43,8 +41,9 @@ TPrivatePageCache::TInfo::TInfo(const TInfo &info) } TPrivatePageCache::TPrivatePageCache(const TCacheCacheConfig &cacheConfig) - : Cache(cacheConfig) { + // todo: clean up + (void)cacheConfig; } void TPrivatePageCache::RegisterPageCollection(TIntrusivePtr<TInfo> info) { @@ -55,35 +54,21 @@ void TPrivatePageCache::RegisterPageCollection(TIntrusivePtr<TInfo> info) { for (const auto& kv : info->PageMap) { auto* page = kv.second.Get(); Y_VERIFY_DEBUG(page); - if (page->LoadState == TPage::LoadStateLoaded && !page->Sticky) { - auto &x = ToTouchShared[page->Info->Id][page->Id]; - if (!page->SharedPending && !page->SharedBody && page->PinnedBody) { - // We keep pinned body around until it's either - // accepted or dropped by the shared cache - page->SharedPending = true; - x = page->PinnedBody; - } - - if (!page->PinPad) { - page->LoadState = TPage::LoadStateNo; - if (!page->SharedPending && page->PinnedBody) { - page->PinnedBody = { }; - } - page->SharedBody.UnUse(); - } - - Y_VERIFY_DEBUG(!page->IsUnnecessary()); - } + if (page->SharedBody) Stats.TotalSharedBody += page->Size; if (page->PinnedBody) Stats.TotalPinnedBody += page->Size; if (page->PinnedBody && !page->SharedBody) Stats.TotalExclusive += page->Size; - if (page->SharedPending) - Stats.TotalSharedPending += page->Size; if (page->Sticky) Stats.TotalSticky += page->Size; + + Y_VERIFY_DEBUG(!page->SharedPending, "New page shouldn't be shared pending"); + TryShareBody(page); + + TryUnload(page); + Y_VERIFY_DEBUG(!page->IsUnnecessary()); } ++info->Users; @@ -143,8 +128,6 @@ bool TPrivatePageCache::UnlockPageCollection(TLogoBlobID id) { for (const auto& kv : info->PageMap) { auto* page = kv.second.Get(); Y_VERIFY_DEBUG(page); - if (page->CacheGeneration != TCacheCacheConfig::CacheGenNone) - Cache.Evict(page); Y_VERIFY(!page->WaitQueue, "non-empty wait queue in forgotten page."); Y_VERIFY(!page->PinPad, "non-empty pin pad in forgotten page."); @@ -170,10 +153,6 @@ bool TPrivatePageCache::UnlockPageCollection(TLogoBlobID id) { return !info->Users; } -THashMap<TLogoBlobID, THashMap<ui32, TSharedData>> TPrivatePageCache::GetPrepareSharedTouched() { - return std::move(ToTouchShared); -} - TPrivatePageCache::TInfo* TPrivatePageCache::Info(TLogoBlobID id) { auto *x = PageCollections.FindPtr(id); if (x) @@ -182,25 +161,6 @@ TPrivatePageCache::TInfo* TPrivatePageCache::Info(TLogoBlobID id) { return nullptr; } -void TPrivatePageCache::Touch(TPage *page) { - if (!page->Sticky) { - switch (page->LoadState) { - case TPage::LoadStateNo: - return; - case TPage::LoadStateLoaded: - case TPage::LoadStateRequested: - case TPage::LoadStateRequestedAsync: - return Evict(Cache.Touch(page)); - } - } -} - -void TPrivatePageCache::Touch(ui32 pageId, TInfo *info) { - if (auto* page = info->GetPage(pageId)) { - Touch(page); - } -} - void TPrivatePageCache::MarkSticky(ui32 pageId, TInfo *collectionInfo) { TPage *page = collectionInfo->EnsurePage(pageId); if (Y_LIKELY(!page->Sticky)) { @@ -208,24 +168,16 @@ void TPrivatePageCache::MarkSticky(ui32 pageId, TInfo *collectionInfo) { // asynchronously later, so sticky pages may not be loaded yet. page->Sticky = 1; Stats.TotalSticky += page->Size; - - // Sticky pages are not expected to exist in the cache - if (page->CacheGeneration != TCacheCacheConfig::CacheGenNone) { - Cache.Evict(page); - page->CacheGeneration = TCacheCacheConfig::CacheGenNone; - } } } -TIntrusivePtr<TPrivatePageCachePinPad> TPrivatePageCache::Pin(ui32 pageId, TInfo *info) { - TPage *page = info->EnsurePage(pageId); - if (!page->PinPad) { +TIntrusivePtr<TPrivatePageCachePinPad> TPrivatePageCache::Pin(TPage *page) { + Y_VERIFY_DEBUG(page); + if (page && !page->PinPad) { page->PinPad = new TPrivatePageCachePinPad(); Stats.PinnedSetSize += page->Size; - // N.B. it's ok not to call Touch here, because pinned pages don't have - // to be part of the cache even when they are loaded. - Restore(page); + TryLoad(page); if (page->LoadState != TPage::LoadStateLoaded) Stats.PinnedLoadSize += page->Size; @@ -234,8 +186,7 @@ TIntrusivePtr<TPrivatePageCachePinPad> TPrivatePageCache::Pin(ui32 pageId, TInfo return page->PinPad; } -void TPrivatePageCache::Unpin(ui32 pageId, TPrivatePageCachePinPad *pad, TInfo *info) { - TPage *page = info->GetPage(pageId); +void TPrivatePageCache::Unpin(TPage *page, TPrivatePageCachePinPad *pad) { if (page && page->PinPad.Get() == pad) { if (page->PinPad.RefCount() == 1) { page->PinPad.Drop(); @@ -243,27 +194,12 @@ void TPrivatePageCache::Unpin(ui32 pageId, TPrivatePageCachePinPad *pad, TInfo * if (page->LoadState != TPage::LoadStateLoaded) Stats.PinnedLoadSize -= page->Size; - if (page->CacheGeneration != TCacheCacheConfig::CacheGenNone || page->Sticky) - return; - - if (page->LoadState == TPage::LoadStateLoaded) { - page->LoadState = TPage::LoadStateNo; - if (!page->SharedPending) { - if (Y_LIKELY(page->PinnedBody)) { - Stats.TotalPinnedBody -= page->Size; - if (!page->SharedBody) { - Stats.TotalExclusive -= page->Size; - } - page->PinnedBody = { }; - } - } - page->SharedBody.UnUse(); - } + TryUnload(page); } } } -std::pair<ui32, ui64> TPrivatePageCache::Load(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info) { +std::pair<ui32, ui64> TPrivatePageCache::Request(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info) { ui32 blocksToRequest = 0; ui64 bytesToRequest = 0; @@ -307,10 +243,9 @@ std::pair<ui32, ui64> TPrivatePageCache::Load(TVector<ui32> &pages, TPrivatePage return std::make_pair(blocksToRequest, bytesToRequest); } -bool TPrivatePageCache::Restore(TPage *page) { - if (page->LoadState == TPage::LoadStateNo && page->SharedPending) { - page->LoadState = TPage::LoadStateLoaded; - return true; +void TPrivatePageCache::TryLoad(TPage *page) { + if (page->LoadState == TPage::LoadStateLoaded) { + return; } if (page->LoadState == TPage::LoadStateNo && page->SharedBody) { @@ -319,7 +254,7 @@ bool TPrivatePageCache::Restore(TPage *page) { Stats.TotalPinnedBody += page->Size; page->PinnedBody = TPinnedPageRef(page->SharedBody).GetData(); page->LoadState = TPage::LoadStateLoaded; - return true; + return; } page->SharedBody.Drop(); @@ -328,32 +263,258 @@ bool TPrivatePageCache::Restore(TPage *page) { Stats.TotalExclusive += page->Size; } } +} - return false; +void TPrivatePageCache::TPrivatePageCache::TryUnload(TPage *page) { + if (page->LoadState == TPage::LoadStateLoaded) { + if (!page->SharedPending && !page->PinPad && !page->Sticky) { + ToTouchShared[page->Info->Id][page->Id]; + page->LoadState = TPage::LoadStateNo; + if (Y_LIKELY(page->PinnedBody)) { + Stats.TotalPinnedBody -= page->Size; + if (!page->SharedBody) { + Stats.TotalExclusive -= page->Size; + } + page->PinnedBody = { }; + } + page->SharedBody.UnUse(); + } + } +} + +// page may be made free after this call +void TPrivatePageCache::TPrivatePageCache::TryEraseIfUnnecessary(TPage *page) { + if (page->IsUnnecessary()) { + if (Y_UNLIKELY(page->PinnedBody)) { + Stats.TotalPinnedBody -= page->Size; + Stats.TotalExclusive -= page->Size; + page->PinnedBody = { }; + } + const ui32 pageId = page->Id; + auto* info = page->Info; + Y_VERIFY_DEBUG(info->PageMap[pageId].Get() == page); + Y_VERIFY(info->PageMap.erase(pageId)); + } +} + +void TPrivatePageCache::TPrivatePageCache::TryShareBody(TPage *page) { + if (page->LoadState == TPage::LoadStateLoaded) { + auto &x = ToTouchShared[page->Info->Id][page->Id]; + if (!page->SharedPending && !page->SharedBody && page->PinnedBody) { + // We keep pinned body around until it's either + // accepted or dropped by the shared cache + page->SharedPending = true; + Stats.TotalSharedPending += page->Size; + x = page->PinnedBody; + } + } } const TSharedData* TPrivatePageCache::Lookup(ui32 pageId, TInfo *info) { + TPage *page = info->EnsurePage(pageId); + + TryLoad(page); + + if (page->LoadState == TPage::LoadStateLoaded) { + if (page->Empty()) { + Touches.PushBack(page); + Stats.CurrentCacheHits++; + } + return &page->PinnedBody; + } + + if (page->Empty()) { + ToLoad.PushBack(page); + Stats.CurrentCacheMisses++; + } + return nullptr; +} + +TSharedPageRef TPrivatePageCache::LookupShared(ui32 pageId, TInfo *info) { TPage *page = info->GetPage(pageId); if (!page) return { }; - if (Restore(page)) { - Touch(page); + if (page->LoadState == TPage::LoadStateLoaded) { + if (page->SharedBody) { + Y_VERIFY_DEBUG(page->SharedBody.IsUsed(), "Loaded page should have used body"); + return page->SharedBody; + } else { + return TSharedPageRef::MakePrivate(page->PinnedBody); + } + } + + if (page->LoadState == TPage::LoadStateNo) { + if (page->SharedBody) { + auto copy = page->SharedBody; + if (copy.Use()) { + return copy; + } + + page->SharedBody.Drop(); + Stats.TotalSharedBody -= page->Size; + if (Y_UNLIKELY(page->PinnedBody)) { + Stats.TotalExclusive += page->Size; + } + } } - return page->GetBody(); + TryEraseIfUnnecessary(page); + return { }; } -TSharedPageRef TPrivatePageCache::LookupShared(ui32 pageId, TInfo *info) { +void TPrivatePageCache::PinTouches(TPinned &pinned, ui32 &touchedPages, ui32 &pinnedPages, ui64 &pinnedMemory) { + for (auto &page : Touches) { + auto &pinnedCollection = pinned[page.Info->Id]; + + // would insert only if first seen + if (pinnedCollection.insert(std::make_pair(page.Id, Pin(&page))).second) { + pinnedPages++; + pinnedMemory += page.Size; + } + touchedPages++; + } +} + +void TPrivatePageCache::PinToLoad(TPinned &pinned, ui32 &pinnedPages, ui64 &pinnedMemory) { + for (auto &page : ToLoad) { + auto &pinnedCollection = pinned[page.Info->Id]; + + // would insert only if first seen + if (pinnedCollection.insert(std::make_pair(page.Id, Pin(&page))).second) { + pinnedPages++; + pinnedMemory += page.Size; + } + } +} + +void TPrivatePageCache::RepinPages(TPinned &newPinned, TPinned &oldPinned, size_t &pinnedPages) { + auto repinTouched = [&](TPage* page) { + auto& newPinnedCollection = newPinned[page->Info->Id]; + + if (auto* oldPinnedCollection = oldPinned.FindPtr(page->Info->Id)) { + // We had previously pinned pages from this page collection + // Create new or move used old pins to the new map + if (auto it = oldPinnedCollection->find(page->Id); it != oldPinnedCollection->end()) { + Y_VERIFY_DEBUG(it->second); + newPinnedCollection[page->Id] = std::move(it->second); + oldPinnedCollection->erase(it); + } else { + newPinnedCollection[page->Id] = Pin(page); + pinnedPages++; + } + } else { + newPinnedCollection[page->Id] = Pin(page); + pinnedPages++; + } + }; + + // Everything touched during this read iteration must be pinned + for (auto& page : Touches) { + repinTouched(&page); + } + for (auto& page : ToLoad) { + repinTouched(&page); + } +} + +void TPrivatePageCache::UnpinPages(TPinned &pinned, size_t &unpinnedPages) { + for (auto &xinfoid : pinned) { + if (TPrivatePageCache::TInfo *info = Info(xinfoid.first)) { + for (auto &x : xinfoid.second) { + ui32 pageId = x.first; + TPrivatePageCachePinPad *pad = x.second.Get(); + x.second.Reset(); + TPage *page = info->GetPage(pageId); + Unpin(page, pad); + unpinnedPages++; + } + } + } +} + +// todo: do we really need that groupping by page collection? +THashMap<TPrivatePageCache::TInfo*, TVector<ui32>> TPrivatePageCache::GetToLoad() const { + THashMap<TPrivatePageCache::TInfo*, TVector<ui32>> result; + for (auto &page : ToLoad) { + result[page.Info].push_back(page.Id); + } + return result; +} + +void TPrivatePageCache::ResetTouchesAndToLoad(bool verifyEmpty) { + if (verifyEmpty) { + Y_VERIFY(!Touches); + Y_VERIFY(!Stats.CurrentCacheHits); + Y_VERIFY(!ToLoad); + Y_VERIFY(!Stats.CurrentCacheMisses); + } + + while (Touches) { + TPage *page = Touches.PopBack(); + TryUnload(page); + } + Stats.CurrentCacheHits = 0; + + while (ToLoad) { + TPage *page = ToLoad.PopBack(); + TryEraseIfUnnecessary(page); + } + Stats.CurrentCacheMisses = 0; +} + +void TPrivatePageCache::UpdateSharedBody(TInfo *info, ui32 pageId, TSharedPageRef shared) { TPage *page = info->GetPage(pageId); if (!page) - return { }; + return; + + Y_VERIFY_DEBUG(page->SharedPending, "Shared cache accepted a page that is not pending, possible bug"); + if (Y_UNLIKELY(!page->SharedPending)) { + return; + } + Y_VERIFY_DEBUG(page->LoadState == TPage::LoadStateLoaded, "Shared pending page should be loaded"); + + // Shared cache accepted our page and provided its shared reference + Stats.TotalSharedPending -= page->Size; + page->SharedPending = false; + if (Y_LIKELY(!page->SharedBody)) { + Stats.TotalSharedBody += page->Size; + if (Y_LIKELY(page->PinnedBody)) { + Stats.TotalExclusive -= page->Size; + } + } + page->SharedBody = std::move(shared); + TryUnload(page); +} + +void TPrivatePageCache::DropSharedBody(TInfo *info, ui32 pageId) { + TPage *page = info->GetPage(pageId); + if (!page) + return; - if (Restore(page)) { - Touch(page); + Y_VERIFY_DEBUG(!page->SharedPending, "Shared cache dropped page sharing request, possible bug"); + if (Y_UNLIKELY(page->SharedPending)) { + // Shared cache rejected our page so we should drop it too + Stats.TotalSharedPending -= page->Size; + page->SharedPending = false; + TryUnload(page); } - return page->GetShared(); + if (!page->SharedBody.IsUsed()) { + if (Y_LIKELY(page->SharedBody)) { + Stats.TotalSharedBody -= page->Size; + if (Y_UNLIKELY(page->PinnedBody)) { + Stats.TotalExclusive += page->Size; + } + page->SharedBody = { }; + } + TryEraseIfUnnecessary(page); + } +} + +void TPrivatePageCache::UpdateCacheSize(ui64 cacheSize) { + // todo: clean up + (void)cacheSize; } TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ProvideBlock( @@ -376,9 +537,9 @@ TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ProvideBlock( // N.B. we must be careful not to accidentally drop the sticky bit page->Fill(std::move(loaded.Page), page->Sticky); - Stats.TotalSharedBody += page->Size; Stats.TotalPinnedBody += page->Size; + TryUnload(page); TPage::TWaitQueuePtr ret; if (page->WaitQueue) { @@ -392,19 +553,9 @@ TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ProvideBlock( page->WaitQueue.Destroy(); } - if (page->CacheGeneration == TCacheCacheConfig::CacheGenNone - && !page->Sticky && !page->PinPad) - { // should be cache refresh by provided profile - Touch(page); - } - return ret; } -void TPrivatePageCache::UpdateCacheSize(ui64 cacheSize) { - Cache.UpdateCacheSize(cacheSize); -} - THashMap<TLogoBlobID, TIntrusivePtr<TPrivatePageCache::TInfo>> TPrivatePageCache::DetachPrivatePageCache() { THashMap<TLogoBlobID, TIntrusivePtr<TPrivatePageCache::TInfo>> ret; @@ -416,74 +567,8 @@ THashMap<TLogoBlobID, TIntrusivePtr<TPrivatePageCache::TInfo>> TPrivatePageCache return ret; } -void TPrivatePageCache::Evict(TPage *pages) { - if (pages == nullptr) - return; - - TPage *page = pages; - for (;;) { - Y_VERIFY(page->CacheGeneration == TCacheCacheConfig::CacheGenEvicted, "evicting non-evicted page with gen %" PRIu32, (ui32)page->CacheGeneration); - page->CacheGeneration = TCacheCacheConfig::CacheGenNone; - - Y_VERIFY(!page->Sticky, "Unexpected sticky page evicted from cache"); - - switch (page->LoadState) { - case TPage::LoadStateNo: - case TPage::LoadStateRequested: - case TPage::LoadStateRequestedAsync: - break; - case TPage::LoadStateLoaded: { - auto &x = ToTouchShared[page->Info->Id][page->Id]; - if (!page->SharedPending && !page->SharedBody && page->PinnedBody) { - // We keep pinned body around until it's either - // accepted or dropped by the shared cache - Stats.TotalSharedPending += page->Size; - page->SharedPending = true; - x = page->PinnedBody; - } - - if (!page->PinPad) { - page->LoadState = TPage::LoadStateNo; - if (!page->SharedPending) { - if (Y_LIKELY(page->PinnedBody)) { - Stats.TotalPinnedBody -= page->Size; - if (!page->SharedBody) { - Stats.TotalExclusive -= page->Size; - } - page->PinnedBody = { }; - } - } - page->SharedBody.UnUse(); - } - - break; - } - default: - Y_FAIL("unknown load state"); - } - - TPage *next = page->Next()->Node(); - if (page != next) { - page->Unlink(); - } - - if (page->IsUnnecessary()) { - if (Y_UNLIKELY(page->PinnedBody)) { - Stats.TotalPinnedBody -= page->Size; - Stats.TotalExclusive -= page->Size; - page->PinnedBody = { }; - } - const ui32 pageId = page->Id; - auto* info = page->Info; - Y_VERIFY_DEBUG(info->PageMap[pageId].Get() == page); - Y_VERIFY(info->PageMap.erase(pageId)); - } - - if (page == next) - break; - - page = next; - } +THashMap<TLogoBlobID, THashMap<ui32, TSharedData>> TPrivatePageCache::GetPrepareSharedTouched() { + return std::move(ToTouchShared); } }} diff --git a/ydb/core/tablet_flat/flat_sausagecache.h b/ydb/core/tablet_flat/flat_sausagecache.h index 4dd2c2895a9..f09852ca025 100644 --- a/ydb/core/tablet_flat/flat_sausagecache.h +++ b/ydb/core/tablet_flat/flat_sausagecache.h @@ -19,6 +19,8 @@ struct TPrivatePageCacheWaitPad : public TExplicitSimpleCounter { }; class TPrivatePageCache { + using TPinned = THashMap<TLogoBlobID, THashMap<ui32, TIntrusivePtr<TPrivatePageCachePinPad>>>; + public: struct TInfo; @@ -31,6 +33,8 @@ public: ui64 TotalSticky = 0; // total number of bytes marked as sticky (never unloaded from memory) ui64 PinnedSetSize = 0; // number of bytes pinned by transactions (even those not currently loaded) ui64 PinnedLoadSize = 0; // number of bytes pinned by transactions (which are currently being loaded) + size_t CurrentCacheHits = 0; // = Touches.Size() + size_t CurrentCacheMisses = 0; // = ToLoad.Size() }; struct TPage : public TIntrusiveListItem<TPage> { @@ -45,10 +49,9 @@ public: }; ui64 LoadState : 2; - ui64 CacheGeneration : 3; ui64 Sticky : 1; ui64 SharedPending : 1; - ui64 Padding : 1; + ui64 : 4; // padding const ui64 Size : 24; const ui64 Id : 32; @@ -83,32 +86,14 @@ public: void Fill(TSharedPageRef shared, bool sticky = false) { Sticky = sticky; SharedPending = false; - LoadState = LoadStateLoaded; SharedBody = std::move(shared); + LoadState = LoadStateLoaded; PinnedBody = TPinnedPageRef(SharedBody).GetData(); } - const TSharedData* GetBody() const noexcept { + const TSharedData* GetPinnedBody() const noexcept { return LoadState == LoadStateLoaded ? &PinnedBody : nullptr; } - - TSharedPageRef GetShared() const noexcept { - if (LoadState == LoadStateLoaded) { - if (SharedBody) { - return SharedBody; - } else { - return TSharedPageRef::MakePrivate(PinnedBody); - } - } - - return { }; - } - - struct TWeight { - static ui64 Get(TPage *x) { - return x->Size; - } - }; }; struct TInfo : public TThrRefBase { @@ -118,7 +103,7 @@ public: const TSharedData* Lookup(ui32 pageId) const noexcept { auto* page = GetPage(pageId); - return page ? page->GetBody() : nullptr; + return page ? page->GetPinnedBody() : nullptr; } TPage* GetPage(ui32 pageId) const noexcept { @@ -141,72 +126,6 @@ public: EnsurePage(loaded.PageId)->Fill(std::move(loaded.Page), sticky); } - void UpdateSharedBody(ui32 pageId, TSharedPageRef shared, TStats *stats) noexcept { - auto* page = GetPage(pageId); - if (!page) { - return; - } - Y_VERIFY_DEBUG(page->SharedPending, "Shared cache accepted a page that is not pending, possible bug"); - if (Y_UNLIKELY(!page->SharedPending)) { - return; - } - // Shared cache accepted our page and provided its shared reference - stats->TotalSharedPending -= page->Size; - page->SharedPending = false; - if (Y_LIKELY(!page->SharedBody)) { - stats->TotalSharedBody += page->Size; - if (Y_LIKELY(page->PinnedBody)) { - stats->TotalExclusive -= page->Size; - } - } - page->SharedBody = std::move(shared); - if (page->LoadState == TPage::LoadStateLoaded) { - if (Y_UNLIKELY(!page->PinnedBody)) - stats->TotalPinnedBody += page->Size; - page->PinnedBody = TPinnedPageRef(page->SharedBody).GetData(); - } else { - if (Y_LIKELY(page->PinnedBody)) - stats->TotalPinnedBody -= page->Size; - page->PinnedBody = { }; - page->SharedBody.UnUse(); - } - } - - void DropSharedBody(ui32 pageId, TStats *stats) noexcept { - if (auto* page = GetPage(pageId)) { - Y_VERIFY_DEBUG(!page->SharedPending, "Shared cache dropped page sharing request, possible bug"); - if (Y_UNLIKELY(page->SharedPending)) { - // Shared cache rejected our page so we should drop it too - stats->TotalSharedPending -= page->Size; - page->SharedPending = false; - if (page->LoadState != TPage::LoadStateLoaded) { - if (Y_LIKELY(page->PinnedBody)) { - stats->TotalPinnedBody -= page->Size; - stats->TotalExclusive -= page->Size; - page->PinnedBody = { }; - } - } - } - if (!page->SharedBody.IsUsed()) { - if (Y_LIKELY(page->SharedBody)) { - stats->TotalSharedBody -= page->Size; - if (Y_UNLIKELY(page->PinnedBody)) { - stats->TotalExclusive += page->Size; - } - page->SharedBody = { }; - } - if (page->IsUnnecessary()) { - if (Y_UNLIKELY(page->PinnedBody)) { - stats->TotalPinnedBody -= page->Size; - stats->TotalExclusive -= page->Size; - page->PinnedBody = { }; - } - PageMap.erase(pageId); - } - } - } - } - const TLogoBlobID Id; const TIntrusiveConstPtr<NPageCollection::IPageCollection> PageCollection; TPageMap<THolder<TPage>> PageMap; @@ -228,28 +147,27 @@ public: TInfo* Info(TLogoBlobID id); - void Touch(ui32 page, TInfo *collectionInfo); - TIntrusivePtr<TPrivatePageCachePinPad> Pin(ui32 page, TInfo *collectionInfo); - void Unpin(ui32 page, TPrivatePageCachePinPad *pad, TInfo *collectionInfo); void MarkSticky(ui32 pageId, TInfo *collectionInfo); const TStats& GetStats() const { return Stats; } - void UpdateSharedBody(TInfo *collectionInfo, ui32 pageId, TSharedPageRef shared) noexcept { - collectionInfo->UpdateSharedBody(pageId, std::move(shared), &Stats); - } - - void DropSharedBody(TInfo *collectionInfo, ui32 pageId) noexcept { - collectionInfo->DropSharedBody(pageId, &Stats); - } - - std::pair<ui32, ui64> Load(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info); // blocks to load, bytes to load + std::pair<ui32, ui64> Request(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info); // blocks to load, bytes to load const TSharedData* Lookup(ui32 page, TInfo *collection); TSharedPageRef LookupShared(ui32 page, TInfo *collection); - TPage::TWaitQueuePtr ProvideBlock(NSharedCache::TEvResult::TLoaded&& loaded, TInfo *collectionInfo); + + void PinTouches(TPinned &pinned, ui32 &touchedPages, ui32 &pinnedPages, ui64 &pinnedMemory); + void PinToLoad(TPinned &pinned, ui32 &pinnedPages, ui64 &pinnedMemory); + void RepinPages(TPinned &newPinned, TPinned &oldPinned, size_t &pinnedPages); + void UnpinPages(TPinned &pinned, size_t &unpinnedPages); + THashMap<TPrivatePageCache::TInfo*, TVector<ui32>> GetToLoad() const; + void ResetTouchesAndToLoad(bool verifyEmpty); + + void UpdateSharedBody(TInfo *collectionInfo, ui32 pageId, TSharedPageRef shared); + void DropSharedBody(TInfo *collectionInfo, ui32 pageId); void UpdateCacheSize(ui64 cacheSize); + TPage::TWaitQueuePtr ProvideBlock(NSharedCache::TEvResult::TLoaded&& loaded, TInfo *collectionInfo); THashMap<TLogoBlobID, TIntrusivePtr<TInfo>> DetachPrivatePageCache(); THashMap<TLogoBlobID, THashMap<ui32, TSharedData>> GetPrepareSharedTouched(); private: @@ -257,11 +175,17 @@ private: THashMap<TLogoBlobID, THashMap<ui32, TSharedData>> ToTouchShared; TStats Stats; - TCacheCache<TPage, TPage::TWeight> Cache; - bool Restore(TPage *page); - void Touch(TPage *page); - void Evict(TPage *pages); + TIntrusiveList<TPage> Touches; + TIntrusiveList<TPage> ToLoad; + + TIntrusivePtr<TPrivatePageCachePinPad> Pin(TPage *page); + void Unpin(TPage *page, TPrivatePageCachePinPad *pad); + + void TryLoad(TPage *page); + void TryUnload(TPage *page); + void TryEraseIfUnnecessary(TPage *page); + void TryShareBody(TPage *page); }; }} |