aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungurtsev <kungasc@ydb.tech>2024-12-05 09:45:49 +0100
committerGitHub <noreply@github.com>2024-12-05 09:45:49 +0100
commit7d9ab3e2cb4b049fecdc31901ed993cfd3b5a81e (patch)
treef58f27e121bcb70eb5175cfd1d0d47e72f567f6e
parentd917bcb1d7dfb27990422185c73940d7f248b73d (diff)
downloadydb-7d9ab3e2cb4b049fecdc31901ed993cfd3b5a81e.tar.gz
Turn Scan memory into passive Shared Cache memory (#12295)
-rw-r--r--ydb/core/tablet_flat/flat_fwd_blobs.h4
-rw-r--r--ydb/core/tablet_flat/flat_fwd_cache.h10
-rw-r--r--ydb/core/tablet_flat/flat_fwd_conf.h13
-rw-r--r--ydb/core/tablet_flat/flat_fwd_env.h8
-rw-r--r--ydb/core/tablet_flat/flat_fwd_iface.h3
-rw-r--r--ydb/core/tablet_flat/flat_fwd_misc.h11
-rw-r--r--ydb/core/tablet_flat/flat_fwd_page.h8
-rw-r--r--ydb/core/tablet_flat/flat_ops_compact.h2
-rw-r--r--ydb/core/tablet_flat/flat_part_loader.h29
-rw-r--r--ydb/core/tablet_flat/flat_sausagecache.h5
-rw-r--r--ydb/core/tablet_flat/flat_scan_actor.h8
-rw-r--r--ydb/core/tablet_flat/flat_scan_spent.h6
-rw-r--r--ydb/core/tablet_flat/test/libs/table/test_envs.h4
-rw-r--r--ydb/core/tablet_flat/ut/ut_forward.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard__stats.cpp2
15 files changed, 74 insertions, 45 deletions
diff --git a/ydb/core/tablet_flat/flat_fwd_blobs.h b/ydb/core/tablet_flat/flat_fwd_blobs.h
index 1fad64cc7d3..8b910633de3 100644
--- a/ydb/core/tablet_flat/flat_fwd_blobs.h
+++ b/ydb/core/tablet_flat/flat_fwd_blobs.h
@@ -54,7 +54,7 @@ namespace NFwd {
Preload(head, upper);
}
- void Fill(NPageCollection::TLoadedPage& page, EPage) noexcept override
+ void Fill(NPageCollection::TLoadedPage& page, NSharedCache::TSharedPageRef sharedPageRef, EPage) noexcept override
{
if (!Pages || page.PageId < Pages.front().PageId) {
Y_ABORT("Blobs fwd cache got page below queue");
@@ -66,7 +66,7 @@ namespace NFwd {
Stat.Saved += page.Data.size();
OnFetch -= page.Data.size();
- OnHold += Lookup(page.PageId).Settle(page);
+ OnHold += Lookup(page.PageId).Settle(page, std::move(sharedPageRef));
Shrink(false /* do not drop loading pages */);
}
diff --git a/ydb/core/tablet_flat/flat_fwd_cache.h b/ydb/core/tablet_flat/flat_fwd_cache.h
index 7f070ff132c..f23ca583ea5 100644
--- a/ydb/core/tablet_flat/flat_fwd_cache.h
+++ b/ydb/core/tablet_flat/flat_fwd_cache.h
@@ -160,7 +160,7 @@ namespace NFwd {
}
}
- void Fill(NPageCollection::TLoadedPage& page, EPage type) noexcept override
+ void Fill(NPageCollection::TLoadedPage& page, NSharedCache::TSharedPageRef sharedPageRef, EPage type) noexcept override
{
Stat.Saved += page.Data.size();
@@ -169,7 +169,7 @@ namespace NFwd {
Y_ABORT_UNLESS(page.PageId == IndexPage.PageId);
Index.emplace(page.Data);
Iter = Index->LookupRow(BeginRowId);
- IndexPage.Settle(page);
+ IndexPage.Settle(page, std::move(sharedPageRef));
return;
}
@@ -180,7 +180,7 @@ namespace NFwd {
Y_ABORT_UNLESS(page.Data.size() <= OnFetch, "Forward cache ahead counters is out of sync");
OnFetch -= page.Data.size();
- OnHold += it->Settle(page); // settle of a dropped page returns 0 and releases its data
+ OnHold += it->Settle(page, std::move(sharedPageRef)); // settle of a dropped page returns 0 and releases its data
ShrinkPages();
}
@@ -353,7 +353,7 @@ namespace NFwd {
}
}
- void Fill(NPageCollection::TLoadedPage& page, EPage type) noexcept override
+ void Fill(NPageCollection::TLoadedPage& page, NSharedCache::TSharedPageRef sharedPageRef, EPage type) noexcept override
{
Stat.Saved += page.Data.size();
@@ -375,7 +375,7 @@ namespace NFwd {
}
}
- it->Settle(page); // settle of a dropped page releases its data
+ it->Settle(page, std::move(sharedPageRef)); // settle of a dropped page releases its data
AdvancePending(levelId);
ShrinkPages(level);
diff --git a/ydb/core/tablet_flat/flat_fwd_conf.h b/ydb/core/tablet_flat/flat_fwd_conf.h
index 94ce5d4c65f..89465a5a95b 100644
--- a/ydb/core/tablet_flat/flat_fwd_conf.h
+++ b/ydb/core/tablet_flat/flat_fwd_conf.h
@@ -3,12 +3,25 @@
#include <util/system/types.h>
#include <util/generic/array_ref.h>
#include <util/generic/vector.h>
+#include <util/stream/format.h>
namespace NKikimr {
namespace NTable {
namespace NFwd {
struct TConf {
+ void Describe(IOutputStream &out) const noexcept
+ {
+ out
+ << "TConf{"
+ << "high=" << HumanReadableSize(AheadHi, SF_BYTES)
+ << ",low=" << HumanReadableSize(AheadLo, SF_BYTES)
+ << ",edge=" << Edge
+ << ",tablet=" << Tablet
+ << ",trace=" << Trace
+ << "}";
+ }
+
/*_ Cache lines read ahead settings */
ui64 AheadHi = 1;
diff --git a/ydb/core/tablet_flat/flat_fwd_env.h b/ydb/core/tablet_flat/flat_fwd_env.h
index a9b96c24f40..41d280527c4 100644
--- a/ydb/core/tablet_flat/flat_fwd_env.h
+++ b/ydb/core/tablet_flat/flat_fwd_env.h
@@ -150,7 +150,7 @@ namespace NFwd {
return Get(GetQueue(part, room), ref, EPage::Opaque);
}
- void DoSave(TIntrusiveConstPtr<IPageCollection> pageCollection, ui64 cookie, TArrayRef<NPageCollection::TLoadedPage> pages)
+ void DoSave(TIntrusiveConstPtr<IPageCollection> pageCollection, ui64 cookie, TVector<NSharedCache::TEvResult::TLoaded> pages)
{
const ui32 epoch = ui32(cookie) - Salt;
if (epoch < Epoch) {
@@ -167,12 +167,14 @@ namespace NFwd {
for (auto& page : pages) {
auto type = EPage(pageCollection->Page(page.PageId).Type);
+ auto data = NSharedCache::TPinnedPageRef(page.Page).GetData();
+ NPageCollection::TLoadedPage loadedPage{page.PageId, std::move(data)};
if (IsIndexPage(type)) {
Y_ABORT_UNLESS(queue.IndexPageCollection->Label() == pageCollection->Label(), "TPart head storage doesn't match with fetch result");
- queue->Fill(page, type);
+ queue->Fill(loadedPage, std::move(page.Page), type);
} else {
Y_ABORT_UNLESS(queue.GroupPageCollection->Label() == pageCollection->Label(), "TPart head storage doesn't match with fetch result");
- queue->Fill(page, type);
+ queue->Fill(loadedPage, std::move(page.Page), type);
}
}
}
diff --git a/ydb/core/tablet_flat/flat_fwd_iface.h b/ydb/core/tablet_flat/flat_fwd_iface.h
index cbdcd036ca9..1b29e935019 100644
--- a/ydb/core/tablet_flat/flat_fwd_iface.h
+++ b/ydb/core/tablet_flat/flat_fwd_iface.h
@@ -3,6 +3,7 @@
#include "flat_page_iface.h"
#include "flat_sausage_fetch.h"
#include "flat_fwd_misc.h"
+#include "shared_handle.h"
namespace NKikimr {
namespace NTable {
@@ -32,7 +33,7 @@ namespace NFwd {
virtual TResult Get(IPageLoadingQueue *head, TPageId pageId, EPage type, ui64 lower) noexcept = 0;
virtual void Forward(IPageLoadingQueue *head, ui64 upper) noexcept = 0;
- virtual void Fill(NPageCollection::TLoadedPage& page, EPage type) noexcept = 0;
+ virtual void Fill(NPageCollection::TLoadedPage& page, NSharedCache::TSharedPageRef sharedPageRef, EPage type) noexcept = 0;
IPageLoadingQueue* Head = nullptr; /* will be set outside of IPageLoadingLogic impl */
TStat Stat;
diff --git a/ydb/core/tablet_flat/flat_fwd_misc.h b/ydb/core/tablet_flat/flat_fwd_misc.h
index d61f6f6c532..57415f510c6 100644
--- a/ydb/core/tablet_flat/flat_fwd_misc.h
+++ b/ydb/core/tablet_flat/flat_fwd_misc.h
@@ -1,6 +1,7 @@
#pragma once
#include <util/stream/output.h>
+#include <util/stream/format.h>
namespace NKikimr {
namespace NTable {
@@ -18,11 +19,11 @@ namespace NFwd {
{
out
<< "TFwd{"
- << "fetch=" << Fetch
- << ",saved=" << Saved
- << ",usage=" << Usage
- << ",after=" << After
- << ",before=" << Before
+ << "fetch=" << HumanReadableSize(Fetch, SF_BYTES)
+ << ",saved=" << HumanReadableSize(Saved, SF_BYTES)
+ << ",usage=" << HumanReadableSize(Usage, SF_BYTES)
+ << ",after=" << HumanReadableSize(After, SF_BYTES)
+ << ",before=" << HumanReadableSize(Before, SF_BYTES)
<< "}";
}
diff --git a/ydb/core/tablet_flat/flat_fwd_page.h b/ydb/core/tablet_flat/flat_fwd_page.h
index b245d8d3f1d..c994e0383d1 100644
--- a/ydb/core/tablet_flat/flat_fwd_page.h
+++ b/ydb/core/tablet_flat/flat_fwd_page.h
@@ -3,6 +3,7 @@
#include "flat_part_iface.h"
#include "flat_sausage_fetch.h"
#include "flat_fwd_misc.h"
+#include "shared_handle.h"
namespace NKikimr {
namespace NTable {
@@ -31,6 +32,7 @@ namespace NFwd {
~TPage()
{
Y_ABORT_UNLESS(!Data, "Forward cache page is still holds data");
+ Y_ABORT_UNLESS(!SharedPageRef, "Forward cache page is still holds data");
}
explicit operator bool() const
@@ -53,7 +55,7 @@ namespace NFwd {
return Data ? &Data : nullptr;
}
- ui32 Settle(NPageCollection::TLoadedPage &page) noexcept
+ ui32 Settle(NPageCollection::TLoadedPage &page, NSharedCache::TSharedPageRef ref) noexcept
{
const auto was = std::exchange(Fetch, EFetch::Done);
@@ -67,6 +69,7 @@ namespace NFwd {
Y_ABORT("Settling page that is not waiting for any data");
} else {
Data = std::move(page.Data);
+ SharedPageRef = ref;
}
return Data.size();
@@ -90,6 +93,8 @@ namespace NFwd {
{
Fetch = Max(Fetch, EFetch::Drop);
+ SharedPageRef.Drop();
+
return std::exchange(Data, { });
}
@@ -100,6 +105,7 @@ namespace NFwd {
EUsage Usage = EUsage::None;
EFetch Fetch = EFetch::None;
TSharedData Data;
+ NSharedCache::TSharedPageRef SharedPageRef;
};
}
diff --git a/ydb/core/tablet_flat/flat_ops_compact.h b/ydb/core/tablet_flat/flat_ops_compact.h
index 3759b755a5c..ce7be59a0ce 100644
--- a/ydb/core/tablet_flat/flat_ops_compact.h
+++ b/ydb/core/tablet_flat/flat_ops_compact.h
@@ -364,7 +364,7 @@ namespace NTabletFlatExecutor {
logl
<< NFmt::Do(*this) << " end=" << ui32(abort)
- << ", " << Blobs << "blobs " << WriteStats.Rows << "r"
+ << ", " << Blobs << " blobs " << WriteStats.Rows << "r"
<< " (max " << Conf->Layout.MaxRows << ")"
<< ", put " << NFmt::If(Spent.Get());
diff --git a/ydb/core/tablet_flat/flat_part_loader.h b/ydb/core/tablet_flat/flat_part_loader.h
index 3028e587409..58cb9139673 100644
--- a/ydb/core/tablet_flat/flat_part_loader.h
+++ b/ydb/core/tablet_flat/flat_part_loader.h
@@ -53,12 +53,20 @@ namespace NTable {
Y_ABORT_UNLESS(part == Part, "Unsupported part");
Y_ABORT_UNLESS(groupId.IsMain(), "Unsupported column group");
- if (auto* savedPage = SavedPages.FindPtr(pageId)) {
- return savedPage;
- } else if (auto* cached = Cache->Lookup(pageId)) {
- // Save page in case it's evicted on the next iteration
- SavedPages[pageId] = *cached;
- return cached;
+ auto savedPage = SavedPages.find(pageId);
+
+ if (savedPage == SavedPages.end()) {
+ if (auto cachedPage = Cache->GetPage(pageId); cachedPage) {
+ if (auto sharedPageRef = cachedPage->SharedBody; sharedPageRef && sharedPageRef.Use()) {
+ // Save page in case it's evicted on the next iteration
+ AddSavedPage(pageId, std::move(sharedPageRef));
+ savedPage = SavedPages.find(pageId);
+ }
+ }
+ }
+
+ if (savedPage != SavedPages.end()) {
+ return &savedPage->second;
} else {
NeedPages.insert(pageId);
return nullptr;
@@ -86,15 +94,22 @@ namespace NTable {
if (cookie == 0 && NeedPages.erase(loaded.PageId)) {
auto pageType = Cache->GetPageType(loaded.PageId);
bool sticky = NeedIn(pageType) || pageType == EPage::FlatIndex;
- SavedPages[loaded.PageId] = NSharedCache::TPinnedPageRef(loaded.Page).GetData();
+ AddSavedPage(loaded.PageId, loaded.Page);
Cache->Fill(loaded.PageId, std::move(loaded.Page), sticky);
}
}
private:
+ void AddSavedPage(TPageId pageId, NSharedCache::TSharedPageRef page) noexcept
+ {
+ SavedPages[pageId] = NSharedCache::TPinnedPageRef(page).GetData();
+ SavedPagesRefs.emplace_back(std::move(page));
+ }
+
const TPart* Part = nullptr;
TIntrusivePtr<TCache> Cache;
THashMap<TPageId, TSharedData> SavedPages;
+ TVector<NSharedCache::TSharedPageRef> SavedPagesRefs;
THashSet<TPageId> NeedPages;
};
diff --git a/ydb/core/tablet_flat/flat_sausagecache.h b/ydb/core/tablet_flat/flat_sausagecache.h
index 3d21fc0f845..1960f113233 100644
--- a/ydb/core/tablet_flat/flat_sausagecache.h
+++ b/ydb/core/tablet_flat/flat_sausagecache.h
@@ -97,11 +97,6 @@ public:
return PageMap.size();
}
- const TSharedData* Lookup(TPageId pageId) const noexcept {
- auto* page = GetPage(pageId);
- return page ? page->GetPinnedBody() : nullptr;
- }
-
TPage* GetPage(TPageId pageId) const noexcept {
return PageMap[pageId].Get();
}
diff --git a/ydb/core/tablet_flat/flat_scan_actor.h b/ydb/core/tablet_flat/flat_scan_actor.h
index 5aab5da29fd..08b8f7c4c20 100644
--- a/ydb/core/tablet_flat/flat_scan_actor.h
+++ b/ydb/core/tablet_flat/flat_scan_actor.h
@@ -624,13 +624,7 @@ namespace NOps {
return Terminate(EAbort::Host);
}
- // TODO: would want to postpone pinning until usage
- TVector<NPageCollection::TLoadedPage> pinned(Reserve(msg.Loaded.size()));
- for (auto& loaded : msg.Loaded) {
- pinned.emplace_back(loaded.PageId, TPinnedPageRef(loaded.Page).GetData());
- }
-
- Cache->DoSave(std::move(msg.Origin), msg.Cookie, pinned);
+ Cache->DoSave(std::move(msg.Origin), msg.Cookie, std::move(msg.Loaded));
if (MayProgress()) {
Spent->Alter(true /* resource available again */);
diff --git a/ydb/core/tablet_flat/flat_scan_spent.h b/ydb/core/tablet_flat/flat_scan_spent.h
index 9b173cdc07b..a6fd78ef73f 100644
--- a/ydb/core/tablet_flat/flat_scan_spent.h
+++ b/ydb/core/tablet_flat/flat_scan_spent.h
@@ -26,9 +26,9 @@ namespace NTable {
const auto now = Time->Now();
out
- << "Spent{" << NFmt::TDelay(now - Fired)
- << " wa " << NFmt::TDelay(Waits + (now - Since))
- << " cnt " << Interrupts << "}";
+ << "Spent{time=" << NFmt::TDelay(now - Fired)
+ << ",wait=" << NFmt::TDelay(Waits + (now - Since))
+ << ",interrupts=" << Interrupts << "}";
}
void Alter(bool available) noexcept
diff --git a/ydb/core/tablet_flat/test/libs/table/test_envs.h b/ydb/core/tablet_flat/test/libs/table/test_envs.h
index 2e28d67ed5d..d993b9ca9f2 100644
--- a/ydb/core/tablet_flat/test/libs/table/test_envs.h
+++ b/ydb/core/tablet_flat/test/libs/table/test_envs.h
@@ -182,11 +182,11 @@ namespace NTest {
for (auto &seq: IndexFetch) {
NPageCollection::TLoadedPage page(seq, *Store->GetPage(IndexRoom, seq));
- PageLoadingLogic->Fill(page, Store->GetPageType(IndexRoom, seq)); /* will move data */
+ PageLoadingLogic->Fill(page, {}, Store->GetPageType(IndexRoom, seq)); /* will move data */
}
for (auto &seq: GroupFetch) {
NPageCollection::TLoadedPage page(seq, *Store->GetPage(GroupRoom, seq));
- PageLoadingLogic->Fill(page, Store->GetPageType(GroupRoom, seq)); /* will move data */
+ PageLoadingLogic->Fill(page, {}, Store->GetPageType(GroupRoom, seq)); /* will move data */
}
IndexFetch.clear();
diff --git a/ydb/core/tablet_flat/ut/ut_forward.cpp b/ydb/core/tablet_flat/ut/ut_forward.cpp
index 391c64ba891..1fc791b4172 100644
--- a/ydb/core/tablet_flat/ut/ut_forward.cpp
+++ b/ydb/core/tablet_flat/ut/ut_forward.cpp
@@ -114,7 +114,7 @@ namespace {
Shuffle(load.begin(), load.end(), Rnd);
for (auto &page : load) {
- Cache->Fill(page, EPage::Opaque);
+ Cache->Fill(page, {}, EPage::Opaque);
}
UNIT_ASSERT(Cache->Stat.Saved == Cache->Stat.Fetch);
@@ -196,7 +196,7 @@ namespace {
Shuffle(load.begin(), load.end(), Rnd);
for (auto &page : load) {
- Cache->Fill(page, Part->GetPageType(page.PageId, {}));
+ Cache->Fill(page, {}, Part->GetPageType(page.PageId, {}));
}
UNIT_ASSERT_VALUES_EQUAL_C(Cache->Stat, stat, CurrentStepStr());
@@ -237,7 +237,7 @@ namespace {
Shuffle(load.begin(), load.end(), Rnd);
for (auto &page : load) {
- Cache->Fill(page, Part->GetPageType(page.PageId, {}));
+ Cache->Fill(page, {}, Part->GetPageType(page.PageId, {}));
}
UNIT_ASSERT_VALUES_EQUAL_C(Cache->Stat, stat, CurrentStepStr());
diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp
index 0579426fbdc..c067095d89d 100644
--- a/ydb/core/tx/datashard/datashard__stats.cpp
+++ b/ydb/core/tx/datashard/datashard__stats.cpp
@@ -123,6 +123,7 @@ public:
for (auto& loaded : msg->Loaded) {
partPages.emplace(pageId, TPinnedPageRef(loaded.Page).GetData());
+ PageRefs.emplace_back(std::move(loaded.Page));
}
page = partPages.FindPtr(pageId);
@@ -230,6 +231,7 @@ private:
}
THashMap<const TPart*, THashMap<TPageId, TSharedData>> Pages;
+ TVector<TSharedPageRef> PageRefs;
ui64 PagesSize = 0;
ui64 CoroutineDeadline;
TAutoPtr<TSpent> Spent;