aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungasc <kungasc@yandex-team.com>2023-11-20 14:30:52 +0300
committerkungasc <kungasc@yandex-team.com>2023-11-20 17:12:57 +0300
commit76b3103bf2a60ab088bbb580003cfe737cca4d34 (patch)
tree10c18385718599f5be72d5ecbf2907ff44c6b7ba
parentddbdb82c6309b027d872f26a39ca9516fb9f0aa3 (diff)
downloadydb-76b3103bf2a60ab088bbb580003cfe737cca4d34.tar.gz
KIKIMR-20127 Better fix InFly stat
-rw-r--r--ydb/core/tablet_flat/flat_bio_actor.cpp6
-rw-r--r--ydb/core/tablet_flat/flat_bio_events.h10
-rw-r--r--ydb/core/tablet_flat/shared_sausagecache.cpp47
3 files changed, 23 insertions, 40 deletions
diff --git a/ydb/core/tablet_flat/flat_bio_actor.cpp b/ydb/core/tablet_flat/flat_bio_actor.cpp
index c198253341..42c7168898 100644
--- a/ydb/core/tablet_flat/flat_bio_actor.cpp
+++ b/ydb/core/tablet_flat/flat_bio_actor.cpp
@@ -206,12 +206,12 @@ void TBlockIO::Terminate(EStatus code) noexcept
<< ", " << BlockStates.size() << " pages";
}
- auto *ev = new TEvData(std::move(Origin->PageCollection), Origin->Cookie, code);
+ auto *ev = new TEvData(Origin, code);
if (code == NKikimrProto::OK) {
size_t index = 0;
- ev->Blocks.reserve(Origin->Pages.size());
- for (ui32 pageId : Origin->Pages) {
+ ev->Blocks.reserve(ev->Fetch->Pages.size());
+ for (ui32 pageId : ev->Fetch->Pages) {
auto& state = BlockStates.at(index++);
ev->Blocks.emplace_back(pageId, std::move(state.Data));
}
diff --git a/ydb/core/tablet_flat/flat_bio_events.h b/ydb/core/tablet_flat/flat_bio_events.h
index dae56ecca0..91dd48f190 100644
--- a/ydb/core/tablet_flat/flat_bio_events.h
+++ b/ydb/core/tablet_flat/flat_bio_events.h
@@ -34,10 +34,9 @@ namespace NBlockIO {
struct TEvData: public TEventLocal<TEvData, ui32(EEv::Data)> {
using EStatus = NKikimrProto::EReplyStatus;
- TEvData(TIntrusiveConstPtr<NPageCollection::IPageCollection> origin, ui64 cookie, EStatus status)
+ TEvData(TAutoPtr<NPageCollection::TFetch> fetch, EStatus status)
: Status(status)
- , Cookie(cookie)
- , Origin(origin)
+ , Fetch(fetch)
{
}
@@ -46,7 +45,7 @@ namespace NBlockIO {
{
out
<< "Blocks{" << Blocks.size() << " pages"
- << " " << Origin->Label()
+ << " " << Fetch->PageCollection->Label()
<< " " << (Status == NKikimrProto::OK ? "ok" : "fail")
<< " " << NKikimrProto::EReplyStatus_Name(Status) << "}";
}
@@ -60,8 +59,7 @@ namespace NBlockIO {
}
const EStatus Status;
- const ui64 Cookie = Max<ui64>();
- TIntrusiveConstPtr<NPageCollection::IPageCollection> Origin;
+ TAutoPtr<NPageCollection::TFetch> Fetch;
TVector<NPageCollection::TLoadedPage> Blocks;
};
diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp
index 10e7ca7935..3d4af9eb95 100644
--- a/ydb/core/tablet_flat/shared_sausagecache.cpp
+++ b/ydb/core/tablet_flat/shared_sausagecache.cpp
@@ -305,8 +305,8 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
TMap<TActorId, TByActorRequest> Requests;
- i64 Limit = 0;
- i64 InFly = 0;
+ ui64 Limit = 0;
+ ui64 InFly = 0;
TActorId NextToRequest;
};
@@ -673,7 +673,8 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
RequestFromQueue(*queue);
} else {
AddInFlyPages(pagesToRequest.size(), pagesToRequestBytes);
- auto *fetch = new NPageCollection::TFetch(0, waitingRequest->PageCollection, std::move(pagesToRequest));
+ // fetch cookie -> requested size
+ auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest));
NBlockIO::Start(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch);
}
}
@@ -927,14 +928,16 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
auto *msg = ev->Get();
+ RemoveInFlyPages(msg->Fetch->Pages.size(), msg->Fetch->Cookie);
+
if (TRequestQueue *queue = (TRequestQueue *)ev->Cookie) {
Y_ABORT_UNLESS(queue == &ScanRequests || queue == &AsyncRequests);
- Y_ABORT_UNLESS(queue->InFly >= (i64)msg->Cookie);
- queue->InFly -= msg->Cookie;
+ Y_ABORT_UNLESS(queue->InFly >= msg->Fetch->Cookie);
+ queue->InFly -= msg->Fetch->Cookie;
RequestFromQueue(*queue);
}
- auto collectionIt = Collections.find(msg->Origin->Label());
+ auto collectionIt = Collections.find(msg->Fetch->PageCollection->Label());
if (collectionIt == Collections.end())
return;
@@ -951,10 +954,6 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
if (!page || !page->HasMissingBody())
continue;
- if (IsInFlyPage(page)) {
- RemoveInFlyPage(page);
- }
-
page->Initialize(std::move(paged.Data));
BodyProvided(collection, paged.PageId, page);
Evict(Cache.Touch(page));
@@ -1137,12 +1136,6 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
continue;
}
- if (IsInFlyPage(page)) {
- // Request is technically inflight, but response will be ignored
- // Pretend request is cancelled for simplicity
- RemoveInFlyPage(page);
- }
-
page->Collection = nullptr;
++droppedPagesCount;
}
@@ -1288,10 +1281,10 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
}
inline void AddInFlyPages(ui64 count, ui64 size) {
- StatLoadInFlyBytes += sizeof(TPage) * count + size;
+ StatLoadInFlyBytes += size;
if (Config->Counters) {
*Config->Counters->LoadInFlyPages += count;
- *Config->Counters->LoadInFlyBytes += sizeof(TPage) * count + size;
+ *Config->Counters->LoadInFlyBytes += size;
}
}
@@ -1313,23 +1306,15 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
}
}
- inline void RemoveInFlyPage(const TPage* page) {
- if (StatLoadInFlyBytes < sizeof(TPage) + page->Size) {
- Y_DEBUG_ABORT_UNLESS(false, "Some race has happened");
- return;
- }
-
- StatLoadInFlyBytes -= sizeof(TPage) + page->Size;
+ inline void RemoveInFlyPages(ui64 count, ui64 size) {
+ Y_ABORT_UNLESS(StatLoadInFlyBytes >= size);
+ StatLoadInFlyBytes -= size;
if (Config->Counters) {
- --*Config->Counters->LoadInFlyPages;
- *Config->Counters->LoadInFlyBytes -= sizeof(TPage) + page->Size;
+ *Config->Counters->LoadInFlyPages -= count;
+ *Config->Counters->LoadInFlyBytes -= size;
}
}
- inline bool IsInFlyPage(const TPage* page) const {
- return page->State == PageStateRequested || page->State == PageStateRequestedAsync;
- }
-
public:
TSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver)
: MemObserver(std::move(memObserver))