diff options
author | kungasc <kungasc@yandex-team.com> | 2023-11-20 14:30:52 +0300 |
---|---|---|
committer | kungasc <kungasc@yandex-team.com> | 2023-11-20 17:12:57 +0300 |
commit | 76b3103bf2a60ab088bbb580003cfe737cca4d34 (patch) | |
tree | 10c18385718599f5be72d5ecbf2907ff44c6b7ba | |
parent | ddbdb82c6309b027d872f26a39ca9516fb9f0aa3 (diff) | |
download | ydb-76b3103bf2a60ab088bbb580003cfe737cca4d34.tar.gz |
KIKIMR-20127 Better fix InFly stat
-rw-r--r-- | ydb/core/tablet_flat/flat_bio_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_bio_events.h | 10 | ||||
-rw-r--r-- | ydb/core/tablet_flat/shared_sausagecache.cpp | 47 |
3 files changed, 23 insertions, 40 deletions
diff --git a/ydb/core/tablet_flat/flat_bio_actor.cpp b/ydb/core/tablet_flat/flat_bio_actor.cpp index c198253341..42c7168898 100644 --- a/ydb/core/tablet_flat/flat_bio_actor.cpp +++ b/ydb/core/tablet_flat/flat_bio_actor.cpp @@ -206,12 +206,12 @@ void TBlockIO::Terminate(EStatus code) noexcept << ", " << BlockStates.size() << " pages"; } - auto *ev = new TEvData(std::move(Origin->PageCollection), Origin->Cookie, code); + auto *ev = new TEvData(Origin, code); if (code == NKikimrProto::OK) { size_t index = 0; - ev->Blocks.reserve(Origin->Pages.size()); - for (ui32 pageId : Origin->Pages) { + ev->Blocks.reserve(ev->Fetch->Pages.size()); + for (ui32 pageId : ev->Fetch->Pages) { auto& state = BlockStates.at(index++); ev->Blocks.emplace_back(pageId, std::move(state.Data)); } diff --git a/ydb/core/tablet_flat/flat_bio_events.h b/ydb/core/tablet_flat/flat_bio_events.h index dae56ecca0..91dd48f190 100644 --- a/ydb/core/tablet_flat/flat_bio_events.h +++ b/ydb/core/tablet_flat/flat_bio_events.h @@ -34,10 +34,9 @@ namespace NBlockIO { struct TEvData: public TEventLocal<TEvData, ui32(EEv::Data)> { using EStatus = NKikimrProto::EReplyStatus; - TEvData(TIntrusiveConstPtr<NPageCollection::IPageCollection> origin, ui64 cookie, EStatus status) + TEvData(TAutoPtr<NPageCollection::TFetch> fetch, EStatus status) : Status(status) - , Cookie(cookie) - , Origin(origin) + , Fetch(fetch) { } @@ -46,7 +45,7 @@ namespace NBlockIO { { out << "Blocks{" << Blocks.size() << " pages" - << " " << Origin->Label() + << " " << Fetch->PageCollection->Label() << " " << (Status == NKikimrProto::OK ? "ok" : "fail") << " " << NKikimrProto::EReplyStatus_Name(Status) << "}"; } @@ -60,8 +59,7 @@ namespace NBlockIO { } const EStatus Status; - const ui64 Cookie = Max<ui64>(); - TIntrusiveConstPtr<NPageCollection::IPageCollection> Origin; + TAutoPtr<NPageCollection::TFetch> Fetch; TVector<NPageCollection::TLoadedPage> Blocks; }; diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp index 10e7ca7935..3d4af9eb95 100644 --- a/ydb/core/tablet_flat/shared_sausagecache.cpp +++ b/ydb/core/tablet_flat/shared_sausagecache.cpp @@ -305,8 +305,8 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { TMap<TActorId, TByActorRequest> Requests; - i64 Limit = 0; - i64 InFly = 0; + ui64 Limit = 0; + ui64 InFly = 0; TActorId NextToRequest; }; @@ -673,7 +673,8 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { RequestFromQueue(*queue); } else { AddInFlyPages(pagesToRequest.size(), pagesToRequestBytes); - auto *fetch = new NPageCollection::TFetch(0, waitingRequest->PageCollection, std::move(pagesToRequest)); + // fetch cookie -> requested size + auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest)); NBlockIO::Start(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch); } } @@ -927,14 +928,16 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { auto *msg = ev->Get(); + RemoveInFlyPages(msg->Fetch->Pages.size(), msg->Fetch->Cookie); + if (TRequestQueue *queue = (TRequestQueue *)ev->Cookie) { Y_ABORT_UNLESS(queue == &ScanRequests || queue == &AsyncRequests); - Y_ABORT_UNLESS(queue->InFly >= (i64)msg->Cookie); - queue->InFly -= msg->Cookie; + Y_ABORT_UNLESS(queue->InFly >= msg->Fetch->Cookie); + queue->InFly -= msg->Fetch->Cookie; RequestFromQueue(*queue); } - auto collectionIt = Collections.find(msg->Origin->Label()); + auto collectionIt = Collections.find(msg->Fetch->PageCollection->Label()); if (collectionIt == Collections.end()) return; @@ -951,10 +954,6 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { if (!page || !page->HasMissingBody()) continue; - if (IsInFlyPage(page)) { - RemoveInFlyPage(page); - } - page->Initialize(std::move(paged.Data)); BodyProvided(collection, paged.PageId, page); Evict(Cache.Touch(page)); @@ -1137,12 +1136,6 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { continue; } - if (IsInFlyPage(page)) { - // Request is technically inflight, but response will be ignored - // Pretend request is cancelled for simplicity - RemoveInFlyPage(page); - } - page->Collection = nullptr; ++droppedPagesCount; } @@ -1288,10 +1281,10 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { } inline void AddInFlyPages(ui64 count, ui64 size) { - StatLoadInFlyBytes += sizeof(TPage) * count + size; + StatLoadInFlyBytes += size; if (Config->Counters) { *Config->Counters->LoadInFlyPages += count; - *Config->Counters->LoadInFlyBytes += sizeof(TPage) * count + size; + *Config->Counters->LoadInFlyBytes += size; } } @@ -1313,23 +1306,15 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { } } - inline void RemoveInFlyPage(const TPage* page) { - if (StatLoadInFlyBytes < sizeof(TPage) + page->Size) { - Y_DEBUG_ABORT_UNLESS(false, "Some race has happened"); - return; - } - - StatLoadInFlyBytes -= sizeof(TPage) + page->Size; + inline void RemoveInFlyPages(ui64 count, ui64 size) { + Y_ABORT_UNLESS(StatLoadInFlyBytes >= size); + StatLoadInFlyBytes -= size; if (Config->Counters) { - --*Config->Counters->LoadInFlyPages; - *Config->Counters->LoadInFlyBytes -= sizeof(TPage) + page->Size; + *Config->Counters->LoadInFlyPages -= count; + *Config->Counters->LoadInFlyBytes -= size; } } - inline bool IsInFlyPage(const TPage* page) const { - return page->State == PageStateRequested || page->State == PageStateRequestedAsync; - } - public: TSharedPageCache(THolder<TSharedPageCacheConfig> config, TIntrusivePtr<TMemObserver> memObserver) : MemObserver(std::move(memObserver)) |