diff options
author | chertus <azuikov@ydb.tech> | 2022-12-22 18:06:37 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-12-22 18:06:37 +0300 |
commit | 31be66f1e859a47b67a5a0e7699a48e48ea3126c (patch) | |
tree | 158a30c427c7e89bbeff519a645eaee6b9516693 | |
parent | e640635ed609ad90b26d7241d9aebb801e51b4da (diff) | |
download | ydb-31be66f1e859a47b67a5a0e7699a48e48ea3126c.tar.gz |
better read in BlobCache
-rw-r--r-- | ydb/core/tx/columnshard/blob_cache.cpp | 88 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_cache.h | 23 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/compaction_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/eviction_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/export_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/indexing_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/read_actor.cpp | 8 |
8 files changed, 107 insertions, 49 deletions
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp index 9585a32443c..e83fded3f2a 100644 --- a/ydb/core/tx/columnshard/blob_cache.cpp +++ b/ydb/core/tx/columnshard/blob_cache.cpp @@ -10,24 +10,26 @@ #include <library/cpp/cache/cache.h> #include <util/string/vector.h> +#include <tuple> namespace NKikimr::NBlobCache { namespace { // Blobs with same tagret can be read in a single request // (e.g. DS blobs from the same tablet residing on the same DS group, or 2 small blobs from the same tablet) - std::pair<ui64, ui32> BlobSource(const TUnifiedBlobId& blobId) { + std::tuple<ui64, ui32, NKikimrBlobStorage::EGetHandleClass> BlobSource(const TUnifiedBlobId& blobId, + NKikimrBlobStorage::EGetHandleClass cls) { Y_VERIFY(blobId.IsValid()); if (blobId.IsDsBlob()) { // Tablet & group restriction - return {blobId.GetTabletId(), blobId.GetDsGroup()}; + return {blobId.GetTabletId(), blobId.GetDsGroup(), cls}; } else if (blobId.IsSmallBlob()) { // Tablet restriction, no group restrictions - return {blobId.GetTabletId(), 0}; + return {blobId.GetTabletId(), 0, cls}; } - return {0, 0}; + return {0, 0, NKikimrBlobStorage::FastRead}; } } @@ -46,12 +48,15 @@ private: struct TReadItem { TBlobRange BlobRange; - bool Fallback{false}; + NKikimrBlobStorage::EGetHandleClass ReadClass = NKikimrBlobStorage::FastRead; + bool PromoteInCache; + bool Fallback; }; static constexpr i64 MAX_IN_FLIGHT_BYTES = 250ll << 20; static constexpr i64 MAX_REQUEST_BYTES = 8ll << 20; - static constexpr TDuration DEFAULT_READ_DEADLINE = TDuration::Seconds(5); + static constexpr TDuration DEFAULT_READ_DEADLINE = TDuration::Seconds(30); + static constexpr TDuration FAST_READ_DEADLINE = TDuration::Seconds(10); TLRUCache<TBlobRange, TString> Cache; THashMap<TUnifiedBlobId, THashSet<TBlobRange>> CachedRanges; // List of cached ranges by blob id @@ -169,21 +174,28 @@ private: void Handle(TEvBlobCache::TEvReadBlobRange::TPtr& ev, const TActorContext& ctx) { const TBlobRange& blobRange = ev->Get()->BlobRange; - const bool promote = ev->Get()->CacheAfterRead; - const bool fallback = ev->Get()->Fallback; + const bool promote = ev->Get()->ReadOptions.CacheAfterRead; + const bool fallback = ev->Get()->ReadOptions.Fallback; + const bool isBackgroud = ev->Get()->ReadOptions.IsBackgroud; LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback); - HandleSingleRangeRead(blobRange, promote, fallback, ev->Sender, ctx); + TReadItem readItem { + .BlobRange = blobRange, + .ReadClass = (isBackgroud ? NKikimrBlobStorage::AsyncRead : NKikimrBlobStorage::FastRead), + .PromoteInCache = promote, + .Fallback = fallback + }; + HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx); MakeReadRequests(ctx); } - void HandleSingleRangeRead(const TBlobRange& blobRange, - bool promoteInCache, bool fallback, const TActorId& sender, const TActorContext& ctx) - { + void HandleSingleRangeRead(TReadItem&& readItem, const TActorId& sender, const TActorContext& ctx) { + const TBlobRange& blobRange = readItem.BlobRange; + // Is in cache? - auto it = promoteInCache ? Cache.Find(blobRange) : Cache.FindWithoutPromote(blobRange); + auto it = readItem.PromoteInCache ? Cache.Find(blobRange) : Cache.FindWithoutPromote(blobRange); if (it != Cache.End()) { Hits->Inc(); HitsBytes->Add(blobRange.Size); @@ -194,28 +206,36 @@ private: // Disable promoting reads for external blobs if MaxCacheExternalDataSize is zero. But keep promoting hits. // For now MaxCacheExternalDataSize is just a disabled/enabled flag. TODO: real MaxCacheExternalDataSize - if (fallback && MaxCacheExternalDataSize == 0) { - promoteInCache = false; + if (readItem.Fallback && MaxCacheExternalDataSize == 0) { + readItem.PromoteInCache = false; } // Is outstanding? auto readIt = OutstandingReads.find(blobRange); if (readIt != OutstandingReads.end()) { readIt->second.Waiting.push_back(sender); - readIt->second.Cache |= promoteInCache; + readIt->second.Cache |= readItem.PromoteInCache; return; } - EnqueueRead(blobRange, promoteInCache, sender, fallback); + EnqueueRead(std::move(readItem), sender); } void Handle(TEvBlobCache::TEvReadBlobRangeBatch::TPtr& ev, const TActorContext& ctx) { const auto& ranges = ev->Get()->BlobRanges; - bool fallback = ev->Get()->Fallback; + const bool promote = ev->Get()->ReadOptions.CacheAfterRead; + const bool fallback = ev->Get()->ReadOptions.Fallback; + const bool isBackgroud = ev->Get()->ReadOptions.IsBackgroud; LOG_S_DEBUG("Batch read request: " << JoinStrings(ranges.begin(), ranges.end(), " ")); for (const auto& blobRange : ranges) { - HandleSingleRangeRead(blobRange, ev->Get()->CacheAfterRead, fallback, ev->Sender, ctx); + TReadItem readItem { + .BlobRange = blobRange, + .ReadClass = (isBackgroud ? NKikimrBlobStorage::AsyncRead : NKikimrBlobStorage::FastRead), + .PromoteInCache = promote, + .Fallback = fallback + }; + HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx); } MakeReadRequests(ctx); @@ -275,29 +295,32 @@ private: CachedRanges.erase(blobIdIt); } - void EnqueueRead(const TBlobRange& blobRange, bool promoteInCache, const TActorId& sender, bool fallback) { + void EnqueueRead(TReadItem&& readItem, const TActorId& sender) { + const auto& blobRange = readItem.BlobRange; TReadInfo& blobInfo = OutstandingReads[blobRange]; blobInfo.Waiting.push_back(sender); - blobInfo.Cache = promoteInCache; + blobInfo.Cache = readItem.PromoteInCache; LOG_S_DEBUG("Enqueue read range: " << blobRange); - ReadQueue.push_back(TReadItem{blobRange, fallback}); + ReadQueue.emplace_back(std::move(readItem)); ReadsInQueue->Set(ReadQueue.size()); } - void SendBatchReadRequest(const std::vector<TBlobRange>& blobRanges, const ui64 cookie, const TActorContext& ctx) { + void SendBatchReadRequest(const std::vector<TBlobRange>& blobRanges, + NKikimrBlobStorage::EGetHandleClass readClass, const ui64 cookie, const TActorContext& ctx) + { Y_VERIFY(!blobRanges.empty()); if (blobRanges.front().BlobId.IsSmallBlob()) { SendBatchReadRequestToTablet(blobRanges, cookie, ctx); } else { - SendBatchReadRequestToDS(blobRanges, cookie, ctx); + SendBatchReadRequestToDS(blobRanges, readClass, cookie, ctx); } } void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges, - const ui64 cookie, const TActorContext& ctx) + NKikimrBlobStorage::EGetHandleClass readClass, const ui64 cookie, const TActorContext& ctx) { const ui32 dsGroup = blobRanges.front().BlobId.GetDsGroup(); @@ -311,18 +334,18 @@ private: queires[i].Set(blobRanges[i].BlobId.GetLogoBlobId(), blobRanges[i].Offset, blobRanges[i].Size); } - const TInstant deadline = AppData(ctx)->TimeProvider->Now() + DEFAULT_READ_DEADLINE; - const NKikimrBlobStorage::EGetHandleClass handleClass = NKikimrBlobStorage::FastRead; + const TInstant deadline = AppData(ctx)->TimeProvider->Now() + + ((readClass == NKikimrBlobStorage::FastRead) ? FAST_READ_DEADLINE : DEFAULT_READ_DEADLINE); SendToBSProxy(ctx, dsGroup, - new TEvBlobStorage::TEvGet(queires, blobRanges.size(), deadline, handleClass, false), + new TEvBlobStorage::TEvGet(queires, blobRanges.size(), deadline, readClass, false), cookie); ReadRequests->Inc(); } void MakeReadRequests(const TActorContext& ctx) { - THashMap<std::pair<ui64, ui32>, std::vector<TBlobRange>> groupedBlobRanges; + THashMap<std::tuple<ui64, ui32, NKikimrBlobStorage::EGetHandleClass>, std::vector<TBlobRange>> groupedBlobRanges; THashMap<TUnifiedBlobId, std::vector<TBlobRange>> fallbackRanges; while (!ReadQueue.empty()) { @@ -340,7 +363,7 @@ private: } InFlightDataSize += blobRange.Size; - std::pair<ui64, ui32> blobSrc = BlobSource(blobRange.BlobId); + auto blobSrc = BlobSource(blobRange.BlobId, readItem.ReadClass); groupedBlobRanges[blobSrc].push_back(blobRange); } @@ -359,10 +382,11 @@ private: for (auto& [target, rangesGroup] : groupedBlobRanges) { ui64 requestSize = 0; + NKikimrBlobStorage::EGetHandleClass readClass = std::get<2>(target); for (auto& blobRange : rangesGroup) { if (requestSize && (requestSize + blobRange.Size > MAX_REQUEST_BYTES)) { - SendBatchReadRequest(CookieToRange[cookie], cookie, ctx); + SendBatchReadRequest(CookieToRange[cookie], readClass, cookie, ctx); cookie = ++ReadCookie; requestSize = 0; } @@ -371,7 +395,7 @@ private: CookieToRange[cookie].emplace_back(std::move(blobRange)); } if (requestSize) { - SendBatchReadRequest(CookieToRange[cookie], cookie, ctx); + SendBatchReadRequest(CookieToRange[cookie], readClass, cookie, ctx); cookie = ++ReadCookie; requestSize = 0; } diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h index afdcc146639..c7960af5564 100644 --- a/ydb/core/tx/columnshard/blob_cache.h +++ b/ydb/core/tx/columnshard/blob_cache.h @@ -20,6 +20,11 @@ using NOlap::TBlobRange; using TLogThis = TCtorLogger<NKikimrServices::BLOB_CACHE>; +struct TReadBlobRangeOptions { + bool CacheAfterRead; + bool Fallback; + bool IsBackgroud; +}; struct TEvBlobCache { enum EEv { @@ -36,13 +41,11 @@ struct TEvBlobCache { struct TEvReadBlobRange : public NActors::TEventLocal<TEvReadBlobRange, EvReadBlobRange> { TBlobRange BlobRange; - bool CacheAfterRead; - bool Fallback; + TReadBlobRangeOptions ReadOptions; - explicit TEvReadBlobRange(const TBlobRange& blobRange, bool cacheResult, bool fallback) + explicit TEvReadBlobRange(const TBlobRange& blobRange, TReadBlobRangeOptions&& opts) : BlobRange(blobRange) - , CacheAfterRead(cacheResult) - , Fallback(fallback) + , ReadOptions(std::move(opts)) {} }; @@ -50,15 +53,13 @@ struct TEvBlobCache { // This is usefull to save IOPs when reading multiple columns from the same blob struct TEvReadBlobRangeBatch : public NActors::TEventLocal<TEvReadBlobRangeBatch, EvReadBlobRangeBatch> { std::vector<TBlobRange> BlobRanges; - bool CacheAfterRead; - bool Fallback; + TReadBlobRangeOptions ReadOptions; - explicit TEvReadBlobRangeBatch(std::vector<TBlobRange>&& blobRanges, bool cacheResult, bool fallback) + explicit TEvReadBlobRangeBatch(std::vector<TBlobRange>&& blobRanges, TReadBlobRangeOptions&& opts) : BlobRanges(std::move(blobRanges)) - , CacheAfterRead(cacheResult) - , Fallback(fallback) + , ReadOptions(std::move(opts)) { - if (fallback) { + if (opts.Fallback) { for (const auto& blobRange : BlobRanges) { Y_VERIFY(blobRange.BlobId == BlobRanges[0].BlobId); } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 60f6915758d..f932fb13f35 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -108,7 +108,13 @@ private: auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs; bool fallback = externBlobs && externBlobs->count(blobRange.BlobId); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, true, fallback)); + + NBlobCache::TReadBlobRangeOptions readOpts { + .CacheAfterRead = true, + .Fallback = fallback, + .IsBackgroud = false + }; + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); ++InFlightReads; InFlightReadBytes += blobRange.Size; return true; diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index 8b5c25b3349..179f10a7b39 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -107,8 +107,13 @@ private: void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) { Y_VERIFY(!ranges.empty()); + NBlobCache::TReadBlobRangeOptions readOpts { + .CacheAfterRead = false, + .Fallback = isExternal, + .IsBackgroud = true + }; Send(BlobCacheActorId, - new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false, isExternal)); + new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), std::move(readOpts))); } void CompactGranules(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index a7affb58703..e379a6b9189 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -104,8 +104,13 @@ private: void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) { Y_VERIFY(!ranges.empty()); + NBlobCache::TReadBlobRangeOptions readOpts { + .CacheAfterRead = false, + .Fallback = isExternal, + .IsBackgroud = true + }; Send(BlobCacheActorId, - new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false, isExternal)); + new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), std::move(readOpts))); } void EvictPortions(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp index acdb680d268..1ee860ff3a2 100644 --- a/ydb/core/tx/columnshard/export_actor.cpp +++ b/ydb/core/tx/columnshard/export_actor.cpp @@ -92,7 +92,13 @@ private: void SendReadRequest(const NBlobCache::TBlobRange& blobRange) { Y_VERIFY(!blobRange.Offset); Y_VERIFY(blobRange.Size); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false, false)); + + NBlobCache::TReadBlobRangeOptions readOpts { + .CacheAfterRead = false, + .Fallback = false, + .IsBackgroud = true + }; + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); } void SendResultAndDie(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index d385d008dce..a05272ce73c 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -108,7 +108,12 @@ private: void SendReadRequest(const NBlobCache::TBlobRange& blobRange) { Y_VERIFY(blobRange.Size); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false, false)); + NBlobCache::TReadBlobRangeOptions readOpts { + .CacheAfterRead = false, + .Fallback = false, + .IsBackgroud = true + }; + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); } void Index(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 2193632cad8..a4f05397d71 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -218,7 +218,13 @@ public: auto& externBlobs = ReadMetadata->ExternBlobs; bool fallback = externBlobs && externBlobs->count(blobRange.BlobId); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, true, fallback)); + + NBlobCache::TReadBlobRangeOptions readOpts { + .CacheAfterRead = true, + .Fallback = fallback, + .IsBackgroud = false + }; + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); } STFUNC(StateWait) { |