aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <stanly@yandex-team.com>2023-05-25 15:55:19 +0300
committerstanly <stanly@yandex-team.com>2023-05-25 15:55:19 +0300
commit0fe15d2368d0dcfd111b233c4efd7fd2f49b40db (patch)
treeb10483bacffbb4cb25d1365ba9359d811c7108f1
parent76e6a533c40affcd3251af43d87a5b485fe09431 (diff)
downloadydb-0fe15d2368d0dcfd111b233c4efd7fd2f49b40db.tar.gz
less work if request can be served form cache
-rw-r--r--ydb/core/tx/columnshard/blob_cache.cpp63
1 files changed, 29 insertions, 34 deletions
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp
index 94a36273963..4d1c8ca3325 100644
--- a/ydb/core/tx/columnshard/blob_cache.cpp
+++ b/ydb/core/tx/columnshard/blob_cache.cpp
@@ -20,12 +20,10 @@ using namespace NActors;
class TBlobCache: public TActorBootstrapped<TBlobCache> {
private:
struct TReadInfo {
- bool Cache; // Put in cache after read?
- TList<TActorId> Waiting; // List of readers
-
- TReadInfo()
- : Cache(true)
- {}
+ /// List of readers.
+ TList<TActorId> Waiting;
+ /// Put in cache after read.
+ bool Cache{false};
};
struct TReadItem : public TReadBlobRangeOptions {
@@ -237,13 +235,12 @@ private:
LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback << " sender:" << ev->Sender);
- TReadItem readItem(ev->Get()->ReadOptions, blobRange);
- HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx);
-
- MakeReadRequests(ctx);
+ if (!HandleSingleRangeRead(TReadItem(ev->Get()->ReadOptions, blobRange), ev->Sender, ctx)) {
+ MakeReadRequests(ctx);
+ }
}
- void HandleSingleRangeRead(TReadItem&& readItem, const TActorId& sender, const TActorContext& ctx) {
+ bool HandleSingleRangeRead(TReadItem readItem, const TActorId& sender, const TActorContext& ctx) {
const TBlobRange& blobRange = readItem.BlobRange;
// Is in cache?
@@ -251,7 +248,8 @@ private:
if (it != Cache.End()) {
Hits->Inc();
HitsBytes->Add(blobRange.Size);
- return SendResult(sender, blobRange, NKikimrProto::OK, it.Value(), ctx, true);
+ SendResult(sender, blobRange, NKikimrProto::OK, it.Value(), ctx, true);
+ return true;
}
LOG_S_DEBUG("Miss cache: " << blobRange << " sender:" << sender);
@@ -267,15 +265,25 @@ private:
}
}
- // Is outstanding?
- auto readIt = OutstandingReads.find(blobRange);
- if (readIt != OutstandingReads.end()) {
- readIt->second.Waiting.push_back(sender);
- readIt->second.Cache |= readItem.PromoteInCache();
- return;
- }
+ // Update set of outstanding requests.
+ TReadInfo& blobInfo = OutstandingReads[blobRange];
+ const bool inserted = blobInfo.Waiting.empty();
+
+ blobInfo.Waiting.push_back(sender);
+ blobInfo.Cache |= readItem.PromoteInCache();
- EnqueueRead(std::move(readItem), sender);
+ if (inserted) {
+ LOG_S_DEBUG("Enqueue read range: " << blobRange);
+
+ ReadQueue.emplace_back(std::move(readItem));
+ ReadsInQueue->Set(ReadQueue.size());
+ // The requested range just put into a read queue.
+ // Extra work should be done to process the queue.
+ return false;
+ } else {
+ // The requested range was already scheduled for read.
+ return true;
+ }
}
void Handle(TEvBlobCache::TEvReadBlobRangeBatch::TPtr& ev, const TActorContext& ctx) {
@@ -286,8 +294,7 @@ private:
readOptions.CacheAfterRead = (i64)MaxCacheDataSize && readOptions.CacheAfterRead;
for (const auto& blobRange : ranges) {
- TReadItem readItem(readOptions, blobRange);
- HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx);
+ HandleSingleRangeRead(TReadItem(readOptions, blobRange), ev->Sender, ctx);
}
MakeReadRequests(ctx);
@@ -347,18 +354,6 @@ private:
CachedRanges.erase(begin, end);
}
- void EnqueueRead(TReadItem&& readItem, const TActorId& sender) {
- const auto& blobRange = readItem.BlobRange;
- TReadInfo& blobInfo = OutstandingReads[blobRange];
- blobInfo.Waiting.push_back(sender);
- blobInfo.Cache = readItem.PromoteInCache();
-
- LOG_S_DEBUG("Enqueue read range: " << blobRange);
-
- ReadQueue.emplace_back(std::move(readItem));
- ReadsInQueue->Set(ReadQueue.size());
- }
-
void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges, const ui64 cookie,
ui32 dsGroup, TReadItem::EReadVariant readVariant, const TActorContext& ctx)
{