diff options
author | stanly <stanly@yandex-team.com> | 2023-05-25 15:55:19 +0300 |
---|---|---|
committer | stanly <stanly@yandex-team.com> | 2023-05-25 15:55:19 +0300 |
commit | 0fe15d2368d0dcfd111b233c4efd7fd2f49b40db (patch) | |
tree | b10483bacffbb4cb25d1365ba9359d811c7108f1 | |
parent | 76e6a533c40affcd3251af43d87a5b485fe09431 (diff) | |
download | ydb-0fe15d2368d0dcfd111b233c4efd7fd2f49b40db.tar.gz |
less work if request can be served form cache
-rw-r--r-- | ydb/core/tx/columnshard/blob_cache.cpp | 63 |
1 files changed, 29 insertions, 34 deletions
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp index 94a36273963..4d1c8ca3325 100644 --- a/ydb/core/tx/columnshard/blob_cache.cpp +++ b/ydb/core/tx/columnshard/blob_cache.cpp @@ -20,12 +20,10 @@ using namespace NActors; class TBlobCache: public TActorBootstrapped<TBlobCache> { private: struct TReadInfo { - bool Cache; // Put in cache after read? - TList<TActorId> Waiting; // List of readers - - TReadInfo() - : Cache(true) - {} + /// List of readers. + TList<TActorId> Waiting; + /// Put in cache after read. + bool Cache{false}; }; struct TReadItem : public TReadBlobRangeOptions { @@ -237,13 +235,12 @@ private: LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback << " sender:" << ev->Sender); - TReadItem readItem(ev->Get()->ReadOptions, blobRange); - HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx); - - MakeReadRequests(ctx); + if (!HandleSingleRangeRead(TReadItem(ev->Get()->ReadOptions, blobRange), ev->Sender, ctx)) { + MakeReadRequests(ctx); + } } - void HandleSingleRangeRead(TReadItem&& readItem, const TActorId& sender, const TActorContext& ctx) { + bool HandleSingleRangeRead(TReadItem readItem, const TActorId& sender, const TActorContext& ctx) { const TBlobRange& blobRange = readItem.BlobRange; // Is in cache? @@ -251,7 +248,8 @@ private: if (it != Cache.End()) { Hits->Inc(); HitsBytes->Add(blobRange.Size); - return SendResult(sender, blobRange, NKikimrProto::OK, it.Value(), ctx, true); + SendResult(sender, blobRange, NKikimrProto::OK, it.Value(), ctx, true); + return true; } LOG_S_DEBUG("Miss cache: " << blobRange << " sender:" << sender); @@ -267,15 +265,25 @@ private: } } - // Is outstanding? - auto readIt = OutstandingReads.find(blobRange); - if (readIt != OutstandingReads.end()) { - readIt->second.Waiting.push_back(sender); - readIt->second.Cache |= readItem.PromoteInCache(); - return; - } + // Update set of outstanding requests. + TReadInfo& blobInfo = OutstandingReads[blobRange]; + const bool inserted = blobInfo.Waiting.empty(); + + blobInfo.Waiting.push_back(sender); + blobInfo.Cache |= readItem.PromoteInCache(); - EnqueueRead(std::move(readItem), sender); + if (inserted) { + LOG_S_DEBUG("Enqueue read range: " << blobRange); + + ReadQueue.emplace_back(std::move(readItem)); + ReadsInQueue->Set(ReadQueue.size()); + // The requested range just put into a read queue. + // Extra work should be done to process the queue. + return false; + } else { + // The requested range was already scheduled for read. + return true; + } } void Handle(TEvBlobCache::TEvReadBlobRangeBatch::TPtr& ev, const TActorContext& ctx) { @@ -286,8 +294,7 @@ private: readOptions.CacheAfterRead = (i64)MaxCacheDataSize && readOptions.CacheAfterRead; for (const auto& blobRange : ranges) { - TReadItem readItem(readOptions, blobRange); - HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx); + HandleSingleRangeRead(TReadItem(readOptions, blobRange), ev->Sender, ctx); } MakeReadRequests(ctx); @@ -347,18 +354,6 @@ private: CachedRanges.erase(begin, end); } - void EnqueueRead(TReadItem&& readItem, const TActorId& sender) { - const auto& blobRange = readItem.BlobRange; - TReadInfo& blobInfo = OutstandingReads[blobRange]; - blobInfo.Waiting.push_back(sender); - blobInfo.Cache = readItem.PromoteInCache(); - - LOG_S_DEBUG("Enqueue read range: " << blobRange); - - ReadQueue.emplace_back(std::move(readItem)); - ReadsInQueue->Set(ReadQueue.size()); - } - void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges, const ui64 cookie, ui32 dsGroup, TReadItem::EReadVariant readVariant, const TActorContext& ctx) { |