diff options
author | kungurtsev <kungasc@ydb.tech> | 2024-12-05 09:45:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-05 09:45:49 +0100 |
commit | 7d9ab3e2cb4b049fecdc31901ed993cfd3b5a81e (patch) | |
tree | f58f27e121bcb70eb5175cfd1d0d47e72f567f6e | |
parent | d917bcb1d7dfb27990422185c73940d7f248b73d (diff) | |
download | ydb-7d9ab3e2cb4b049fecdc31901ed993cfd3b5a81e.tar.gz |
Turn Scan memory into passive Shared Cache memory (#12295)
-rw-r--r-- | ydb/core/tablet_flat/flat_fwd_blobs.h | 4 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_fwd_cache.h | 10 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_fwd_conf.h | 13 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_fwd_env.h | 8 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_fwd_iface.h | 3 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_fwd_misc.h | 11 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_fwd_page.h | 8 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_ops_compact.h | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_part_loader.h | 29 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_sausagecache.h | 5 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_scan_actor.h | 8 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_scan_spent.h | 6 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/test_envs.h | 4 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut/ut_forward.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__stats.cpp | 2 |
15 files changed, 74 insertions, 45 deletions
diff --git a/ydb/core/tablet_flat/flat_fwd_blobs.h b/ydb/core/tablet_flat/flat_fwd_blobs.h index 1fad64cc7d3..8b910633de3 100644 --- a/ydb/core/tablet_flat/flat_fwd_blobs.h +++ b/ydb/core/tablet_flat/flat_fwd_blobs.h @@ -54,7 +54,7 @@ namespace NFwd { Preload(head, upper); } - void Fill(NPageCollection::TLoadedPage& page, EPage) noexcept override + void Fill(NPageCollection::TLoadedPage& page, NSharedCache::TSharedPageRef sharedPageRef, EPage) noexcept override { if (!Pages || page.PageId < Pages.front().PageId) { Y_ABORT("Blobs fwd cache got page below queue"); @@ -66,7 +66,7 @@ namespace NFwd { Stat.Saved += page.Data.size(); OnFetch -= page.Data.size(); - OnHold += Lookup(page.PageId).Settle(page); + OnHold += Lookup(page.PageId).Settle(page, std::move(sharedPageRef)); Shrink(false /* do not drop loading pages */); } diff --git a/ydb/core/tablet_flat/flat_fwd_cache.h b/ydb/core/tablet_flat/flat_fwd_cache.h index 7f070ff132c..f23ca583ea5 100644 --- a/ydb/core/tablet_flat/flat_fwd_cache.h +++ b/ydb/core/tablet_flat/flat_fwd_cache.h @@ -160,7 +160,7 @@ namespace NFwd { } } - void Fill(NPageCollection::TLoadedPage& page, EPage type) noexcept override + void Fill(NPageCollection::TLoadedPage& page, NSharedCache::TSharedPageRef sharedPageRef, EPage type) noexcept override { Stat.Saved += page.Data.size(); @@ -169,7 +169,7 @@ namespace NFwd { Y_ABORT_UNLESS(page.PageId == IndexPage.PageId); Index.emplace(page.Data); Iter = Index->LookupRow(BeginRowId); - IndexPage.Settle(page); + IndexPage.Settle(page, std::move(sharedPageRef)); return; } @@ -180,7 +180,7 @@ namespace NFwd { Y_ABORT_UNLESS(page.Data.size() <= OnFetch, "Forward cache ahead counters is out of sync"); OnFetch -= page.Data.size(); - OnHold += it->Settle(page); // settle of a dropped page returns 0 and releases its data + OnHold += it->Settle(page, std::move(sharedPageRef)); // settle of a dropped page returns 0 and releases its data ShrinkPages(); } @@ -353,7 +353,7 @@ namespace NFwd { } } - void Fill(NPageCollection::TLoadedPage& page, EPage type) noexcept override + void Fill(NPageCollection::TLoadedPage& page, NSharedCache::TSharedPageRef sharedPageRef, EPage type) noexcept override { Stat.Saved += page.Data.size(); @@ -375,7 +375,7 @@ namespace NFwd { } } - it->Settle(page); // settle of a dropped page releases its data + it->Settle(page, std::move(sharedPageRef)); // settle of a dropped page releases its data AdvancePending(levelId); ShrinkPages(level); diff --git a/ydb/core/tablet_flat/flat_fwd_conf.h b/ydb/core/tablet_flat/flat_fwd_conf.h index 94ce5d4c65f..89465a5a95b 100644 --- a/ydb/core/tablet_flat/flat_fwd_conf.h +++ b/ydb/core/tablet_flat/flat_fwd_conf.h @@ -3,12 +3,25 @@ #include <util/system/types.h> #include <util/generic/array_ref.h> #include <util/generic/vector.h> +#include <util/stream/format.h> namespace NKikimr { namespace NTable { namespace NFwd { struct TConf { + void Describe(IOutputStream &out) const noexcept + { + out + << "TConf{" + << "high=" << HumanReadableSize(AheadHi, SF_BYTES) + << ",low=" << HumanReadableSize(AheadLo, SF_BYTES) + << ",edge=" << Edge + << ",tablet=" << Tablet + << ",trace=" << Trace + << "}"; + } + /*_ Cache lines read ahead settings */ ui64 AheadHi = 1; diff --git a/ydb/core/tablet_flat/flat_fwd_env.h b/ydb/core/tablet_flat/flat_fwd_env.h index a9b96c24f40..41d280527c4 100644 --- a/ydb/core/tablet_flat/flat_fwd_env.h +++ b/ydb/core/tablet_flat/flat_fwd_env.h @@ -150,7 +150,7 @@ namespace NFwd { return Get(GetQueue(part, room), ref, EPage::Opaque); } - void DoSave(TIntrusiveConstPtr<IPageCollection> pageCollection, ui64 cookie, TArrayRef<NPageCollection::TLoadedPage> pages) + void DoSave(TIntrusiveConstPtr<IPageCollection> pageCollection, ui64 cookie, TVector<NSharedCache::TEvResult::TLoaded> pages) { const ui32 epoch = ui32(cookie) - Salt; if (epoch < Epoch) { @@ -167,12 +167,14 @@ namespace NFwd { for (auto& page : pages) { auto type = EPage(pageCollection->Page(page.PageId).Type); + auto data = NSharedCache::TPinnedPageRef(page.Page).GetData(); + NPageCollection::TLoadedPage loadedPage{page.PageId, std::move(data)}; if (IsIndexPage(type)) { Y_ABORT_UNLESS(queue.IndexPageCollection->Label() == pageCollection->Label(), "TPart head storage doesn't match with fetch result"); - queue->Fill(page, type); + queue->Fill(loadedPage, std::move(page.Page), type); } else { Y_ABORT_UNLESS(queue.GroupPageCollection->Label() == pageCollection->Label(), "TPart head storage doesn't match with fetch result"); - queue->Fill(page, type); + queue->Fill(loadedPage, std::move(page.Page), type); } } } diff --git a/ydb/core/tablet_flat/flat_fwd_iface.h b/ydb/core/tablet_flat/flat_fwd_iface.h index cbdcd036ca9..1b29e935019 100644 --- a/ydb/core/tablet_flat/flat_fwd_iface.h +++ b/ydb/core/tablet_flat/flat_fwd_iface.h @@ -3,6 +3,7 @@ #include "flat_page_iface.h" #include "flat_sausage_fetch.h" #include "flat_fwd_misc.h" +#include "shared_handle.h" namespace NKikimr { namespace NTable { @@ -32,7 +33,7 @@ namespace NFwd { virtual TResult Get(IPageLoadingQueue *head, TPageId pageId, EPage type, ui64 lower) noexcept = 0; virtual void Forward(IPageLoadingQueue *head, ui64 upper) noexcept = 0; - virtual void Fill(NPageCollection::TLoadedPage& page, EPage type) noexcept = 0; + virtual void Fill(NPageCollection::TLoadedPage& page, NSharedCache::TSharedPageRef sharedPageRef, EPage type) noexcept = 0; IPageLoadingQueue* Head = nullptr; /* will be set outside of IPageLoadingLogic impl */ TStat Stat; diff --git a/ydb/core/tablet_flat/flat_fwd_misc.h b/ydb/core/tablet_flat/flat_fwd_misc.h index d61f6f6c532..57415f510c6 100644 --- a/ydb/core/tablet_flat/flat_fwd_misc.h +++ b/ydb/core/tablet_flat/flat_fwd_misc.h @@ -1,6 +1,7 @@ #pragma once #include <util/stream/output.h> +#include <util/stream/format.h> namespace NKikimr { namespace NTable { @@ -18,11 +19,11 @@ namespace NFwd { { out << "TFwd{" - << "fetch=" << Fetch - << ",saved=" << Saved - << ",usage=" << Usage - << ",after=" << After - << ",before=" << Before + << "fetch=" << HumanReadableSize(Fetch, SF_BYTES) + << ",saved=" << HumanReadableSize(Saved, SF_BYTES) + << ",usage=" << HumanReadableSize(Usage, SF_BYTES) + << ",after=" << HumanReadableSize(After, SF_BYTES) + << ",before=" << HumanReadableSize(Before, SF_BYTES) << "}"; } diff --git a/ydb/core/tablet_flat/flat_fwd_page.h b/ydb/core/tablet_flat/flat_fwd_page.h index b245d8d3f1d..c994e0383d1 100644 --- a/ydb/core/tablet_flat/flat_fwd_page.h +++ b/ydb/core/tablet_flat/flat_fwd_page.h @@ -3,6 +3,7 @@ #include "flat_part_iface.h" #include "flat_sausage_fetch.h" #include "flat_fwd_misc.h" +#include "shared_handle.h" namespace NKikimr { namespace NTable { @@ -31,6 +32,7 @@ namespace NFwd { ~TPage() { Y_ABORT_UNLESS(!Data, "Forward cache page is still holds data"); + Y_ABORT_UNLESS(!SharedPageRef, "Forward cache page is still holds data"); } explicit operator bool() const @@ -53,7 +55,7 @@ namespace NFwd { return Data ? &Data : nullptr; } - ui32 Settle(NPageCollection::TLoadedPage &page) noexcept + ui32 Settle(NPageCollection::TLoadedPage &page, NSharedCache::TSharedPageRef ref) noexcept { const auto was = std::exchange(Fetch, EFetch::Done); @@ -67,6 +69,7 @@ namespace NFwd { Y_ABORT("Settling page that is not waiting for any data"); } else { Data = std::move(page.Data); + SharedPageRef = ref; } return Data.size(); @@ -90,6 +93,8 @@ namespace NFwd { { Fetch = Max(Fetch, EFetch::Drop); + SharedPageRef.Drop(); + return std::exchange(Data, { }); } @@ -100,6 +105,7 @@ namespace NFwd { EUsage Usage = EUsage::None; EFetch Fetch = EFetch::None; TSharedData Data; + NSharedCache::TSharedPageRef SharedPageRef; }; } diff --git a/ydb/core/tablet_flat/flat_ops_compact.h b/ydb/core/tablet_flat/flat_ops_compact.h index 3759b755a5c..ce7be59a0ce 100644 --- a/ydb/core/tablet_flat/flat_ops_compact.h +++ b/ydb/core/tablet_flat/flat_ops_compact.h @@ -364,7 +364,7 @@ namespace NTabletFlatExecutor { logl << NFmt::Do(*this) << " end=" << ui32(abort) - << ", " << Blobs << "blobs " << WriteStats.Rows << "r" + << ", " << Blobs << " blobs " << WriteStats.Rows << "r" << " (max " << Conf->Layout.MaxRows << ")" << ", put " << NFmt::If(Spent.Get()); diff --git a/ydb/core/tablet_flat/flat_part_loader.h b/ydb/core/tablet_flat/flat_part_loader.h index 3028e587409..58cb9139673 100644 --- a/ydb/core/tablet_flat/flat_part_loader.h +++ b/ydb/core/tablet_flat/flat_part_loader.h @@ -53,12 +53,20 @@ namespace NTable { Y_ABORT_UNLESS(part == Part, "Unsupported part"); Y_ABORT_UNLESS(groupId.IsMain(), "Unsupported column group"); - if (auto* savedPage = SavedPages.FindPtr(pageId)) { - return savedPage; - } else if (auto* cached = Cache->Lookup(pageId)) { - // Save page in case it's evicted on the next iteration - SavedPages[pageId] = *cached; - return cached; + auto savedPage = SavedPages.find(pageId); + + if (savedPage == SavedPages.end()) { + if (auto cachedPage = Cache->GetPage(pageId); cachedPage) { + if (auto sharedPageRef = cachedPage->SharedBody; sharedPageRef && sharedPageRef.Use()) { + // Save page in case it's evicted on the next iteration + AddSavedPage(pageId, std::move(sharedPageRef)); + savedPage = SavedPages.find(pageId); + } + } + } + + if (savedPage != SavedPages.end()) { + return &savedPage->second; } else { NeedPages.insert(pageId); return nullptr; @@ -86,15 +94,22 @@ namespace NTable { if (cookie == 0 && NeedPages.erase(loaded.PageId)) { auto pageType = Cache->GetPageType(loaded.PageId); bool sticky = NeedIn(pageType) || pageType == EPage::FlatIndex; - SavedPages[loaded.PageId] = NSharedCache::TPinnedPageRef(loaded.Page).GetData(); + AddSavedPage(loaded.PageId, loaded.Page); Cache->Fill(loaded.PageId, std::move(loaded.Page), sticky); } } private: + void AddSavedPage(TPageId pageId, NSharedCache::TSharedPageRef page) noexcept + { + SavedPages[pageId] = NSharedCache::TPinnedPageRef(page).GetData(); + SavedPagesRefs.emplace_back(std::move(page)); + } + const TPart* Part = nullptr; TIntrusivePtr<TCache> Cache; THashMap<TPageId, TSharedData> SavedPages; + TVector<NSharedCache::TSharedPageRef> SavedPagesRefs; THashSet<TPageId> NeedPages; }; diff --git a/ydb/core/tablet_flat/flat_sausagecache.h b/ydb/core/tablet_flat/flat_sausagecache.h index 3d21fc0f845..1960f113233 100644 --- a/ydb/core/tablet_flat/flat_sausagecache.h +++ b/ydb/core/tablet_flat/flat_sausagecache.h @@ -97,11 +97,6 @@ public: return PageMap.size(); } - const TSharedData* Lookup(TPageId pageId) const noexcept { - auto* page = GetPage(pageId); - return page ? page->GetPinnedBody() : nullptr; - } - TPage* GetPage(TPageId pageId) const noexcept { return PageMap[pageId].Get(); } diff --git a/ydb/core/tablet_flat/flat_scan_actor.h b/ydb/core/tablet_flat/flat_scan_actor.h index 5aab5da29fd..08b8f7c4c20 100644 --- a/ydb/core/tablet_flat/flat_scan_actor.h +++ b/ydb/core/tablet_flat/flat_scan_actor.h @@ -624,13 +624,7 @@ namespace NOps { return Terminate(EAbort::Host); } - // TODO: would want to postpone pinning until usage - TVector<NPageCollection::TLoadedPage> pinned(Reserve(msg.Loaded.size())); - for (auto& loaded : msg.Loaded) { - pinned.emplace_back(loaded.PageId, TPinnedPageRef(loaded.Page).GetData()); - } - - Cache->DoSave(std::move(msg.Origin), msg.Cookie, pinned); + Cache->DoSave(std::move(msg.Origin), msg.Cookie, std::move(msg.Loaded)); if (MayProgress()) { Spent->Alter(true /* resource available again */); diff --git a/ydb/core/tablet_flat/flat_scan_spent.h b/ydb/core/tablet_flat/flat_scan_spent.h index 9b173cdc07b..a6fd78ef73f 100644 --- a/ydb/core/tablet_flat/flat_scan_spent.h +++ b/ydb/core/tablet_flat/flat_scan_spent.h @@ -26,9 +26,9 @@ namespace NTable { const auto now = Time->Now(); out - << "Spent{" << NFmt::TDelay(now - Fired) - << " wa " << NFmt::TDelay(Waits + (now - Since)) - << " cnt " << Interrupts << "}"; + << "Spent{time=" << NFmt::TDelay(now - Fired) + << ",wait=" << NFmt::TDelay(Waits + (now - Since)) + << ",interrupts=" << Interrupts << "}"; } void Alter(bool available) noexcept diff --git a/ydb/core/tablet_flat/test/libs/table/test_envs.h b/ydb/core/tablet_flat/test/libs/table/test_envs.h index 2e28d67ed5d..d993b9ca9f2 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_envs.h +++ b/ydb/core/tablet_flat/test/libs/table/test_envs.h @@ -182,11 +182,11 @@ namespace NTest { for (auto &seq: IndexFetch) { NPageCollection::TLoadedPage page(seq, *Store->GetPage(IndexRoom, seq)); - PageLoadingLogic->Fill(page, Store->GetPageType(IndexRoom, seq)); /* will move data */ + PageLoadingLogic->Fill(page, {}, Store->GetPageType(IndexRoom, seq)); /* will move data */ } for (auto &seq: GroupFetch) { NPageCollection::TLoadedPage page(seq, *Store->GetPage(GroupRoom, seq)); - PageLoadingLogic->Fill(page, Store->GetPageType(GroupRoom, seq)); /* will move data */ + PageLoadingLogic->Fill(page, {}, Store->GetPageType(GroupRoom, seq)); /* will move data */ } IndexFetch.clear(); diff --git a/ydb/core/tablet_flat/ut/ut_forward.cpp b/ydb/core/tablet_flat/ut/ut_forward.cpp index 391c64ba891..1fc791b4172 100644 --- a/ydb/core/tablet_flat/ut/ut_forward.cpp +++ b/ydb/core/tablet_flat/ut/ut_forward.cpp @@ -114,7 +114,7 @@ namespace { Shuffle(load.begin(), load.end(), Rnd); for (auto &page : load) { - Cache->Fill(page, EPage::Opaque); + Cache->Fill(page, {}, EPage::Opaque); } UNIT_ASSERT(Cache->Stat.Saved == Cache->Stat.Fetch); @@ -196,7 +196,7 @@ namespace { Shuffle(load.begin(), load.end(), Rnd); for (auto &page : load) { - Cache->Fill(page, Part->GetPageType(page.PageId, {})); + Cache->Fill(page, {}, Part->GetPageType(page.PageId, {})); } UNIT_ASSERT_VALUES_EQUAL_C(Cache->Stat, stat, CurrentStepStr()); @@ -237,7 +237,7 @@ namespace { Shuffle(load.begin(), load.end(), Rnd); for (auto &page : load) { - Cache->Fill(page, Part->GetPageType(page.PageId, {})); + Cache->Fill(page, {}, Part->GetPageType(page.PageId, {})); } UNIT_ASSERT_VALUES_EQUAL_C(Cache->Stat, stat, CurrentStepStr()); diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp index 0579426fbdc..c067095d89d 100644 --- a/ydb/core/tx/datashard/datashard__stats.cpp +++ b/ydb/core/tx/datashard/datashard__stats.cpp @@ -123,6 +123,7 @@ public: for (auto& loaded : msg->Loaded) { partPages.emplace(pageId, TPinnedPageRef(loaded.Page).GetData()); + PageRefs.emplace_back(std::move(loaded.Page)); } page = partPages.FindPtr(pageId); @@ -230,6 +231,7 @@ private: } THashMap<const TPart*, THashMap<TPageId, TSharedData>> Pages; + TVector<TSharedPageRef> PageRefs; ui64 PagesSize = 0; ui64 CoroutineDeadline; TAutoPtr<TSpent> Spent; |