aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-12-22 18:06:37 +0300
committerchertus <azuikov@ydb.tech>2022-12-22 18:06:37 +0300
commit31be66f1e859a47b67a5a0e7699a48e48ea3126c (patch)
tree158a30c427c7e89bbeff519a645eaee6b9516693
parente640635ed609ad90b26d7241d9aebb801e51b4da (diff)
downloadydb-31be66f1e859a47b67a5a0e7699a48e48ea3126c.tar.gz
better read in BlobCache
-rw-r--r--ydb/core/tx/columnshard/blob_cache.cpp88
-rw-r--r--ydb/core/tx/columnshard/blob_cache.h23
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp8
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp7
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp7
-rw-r--r--ydb/core/tx/columnshard/export_actor.cpp8
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp7
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp8
8 files changed, 107 insertions, 49 deletions
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp
index 9585a32443c..e83fded3f2a 100644
--- a/ydb/core/tx/columnshard/blob_cache.cpp
+++ b/ydb/core/tx/columnshard/blob_cache.cpp
@@ -10,24 +10,26 @@
#include <library/cpp/cache/cache.h>
#include <util/string/vector.h>
+#include <tuple>
namespace NKikimr::NBlobCache {
namespace {
// Blobs with same tagret can be read in a single request
// (e.g. DS blobs from the same tablet residing on the same DS group, or 2 small blobs from the same tablet)
- std::pair<ui64, ui32> BlobSource(const TUnifiedBlobId& blobId) {
+ std::tuple<ui64, ui32, NKikimrBlobStorage::EGetHandleClass> BlobSource(const TUnifiedBlobId& blobId,
+ NKikimrBlobStorage::EGetHandleClass cls) {
Y_VERIFY(blobId.IsValid());
if (blobId.IsDsBlob()) {
// Tablet & group restriction
- return {blobId.GetTabletId(), blobId.GetDsGroup()};
+ return {blobId.GetTabletId(), blobId.GetDsGroup(), cls};
} else if (blobId.IsSmallBlob()) {
// Tablet restriction, no group restrictions
- return {blobId.GetTabletId(), 0};
+ return {blobId.GetTabletId(), 0, cls};
}
- return {0, 0};
+ return {0, 0, NKikimrBlobStorage::FastRead};
}
}
@@ -46,12 +48,15 @@ private:
struct TReadItem {
TBlobRange BlobRange;
- bool Fallback{false};
+ NKikimrBlobStorage::EGetHandleClass ReadClass = NKikimrBlobStorage::FastRead;
+ bool PromoteInCache;
+ bool Fallback;
};
static constexpr i64 MAX_IN_FLIGHT_BYTES = 250ll << 20;
static constexpr i64 MAX_REQUEST_BYTES = 8ll << 20;
- static constexpr TDuration DEFAULT_READ_DEADLINE = TDuration::Seconds(5);
+ static constexpr TDuration DEFAULT_READ_DEADLINE = TDuration::Seconds(30);
+ static constexpr TDuration FAST_READ_DEADLINE = TDuration::Seconds(10);
TLRUCache<TBlobRange, TString> Cache;
THashMap<TUnifiedBlobId, THashSet<TBlobRange>> CachedRanges; // List of cached ranges by blob id
@@ -169,21 +174,28 @@ private:
void Handle(TEvBlobCache::TEvReadBlobRange::TPtr& ev, const TActorContext& ctx) {
const TBlobRange& blobRange = ev->Get()->BlobRange;
- const bool promote = ev->Get()->CacheAfterRead;
- const bool fallback = ev->Get()->Fallback;
+ const bool promote = ev->Get()->ReadOptions.CacheAfterRead;
+ const bool fallback = ev->Get()->ReadOptions.Fallback;
+ const bool isBackgroud = ev->Get()->ReadOptions.IsBackgroud;
LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback);
- HandleSingleRangeRead(blobRange, promote, fallback, ev->Sender, ctx);
+ TReadItem readItem {
+ .BlobRange = blobRange,
+ .ReadClass = (isBackgroud ? NKikimrBlobStorage::AsyncRead : NKikimrBlobStorage::FastRead),
+ .PromoteInCache = promote,
+ .Fallback = fallback
+ };
+ HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx);
MakeReadRequests(ctx);
}
- void HandleSingleRangeRead(const TBlobRange& blobRange,
- bool promoteInCache, bool fallback, const TActorId& sender, const TActorContext& ctx)
- {
+ void HandleSingleRangeRead(TReadItem&& readItem, const TActorId& sender, const TActorContext& ctx) {
+ const TBlobRange& blobRange = readItem.BlobRange;
+
// Is in cache?
- auto it = promoteInCache ? Cache.Find(blobRange) : Cache.FindWithoutPromote(blobRange);
+ auto it = readItem.PromoteInCache ? Cache.Find(blobRange) : Cache.FindWithoutPromote(blobRange);
if (it != Cache.End()) {
Hits->Inc();
HitsBytes->Add(blobRange.Size);
@@ -194,28 +206,36 @@ private:
// Disable promoting reads for external blobs if MaxCacheExternalDataSize is zero. But keep promoting hits.
// For now MaxCacheExternalDataSize is just a disabled/enabled flag. TODO: real MaxCacheExternalDataSize
- if (fallback && MaxCacheExternalDataSize == 0) {
- promoteInCache = false;
+ if (readItem.Fallback && MaxCacheExternalDataSize == 0) {
+ readItem.PromoteInCache = false;
}
// Is outstanding?
auto readIt = OutstandingReads.find(blobRange);
if (readIt != OutstandingReads.end()) {
readIt->second.Waiting.push_back(sender);
- readIt->second.Cache |= promoteInCache;
+ readIt->second.Cache |= readItem.PromoteInCache;
return;
}
- EnqueueRead(blobRange, promoteInCache, sender, fallback);
+ EnqueueRead(std::move(readItem), sender);
}
void Handle(TEvBlobCache::TEvReadBlobRangeBatch::TPtr& ev, const TActorContext& ctx) {
const auto& ranges = ev->Get()->BlobRanges;
- bool fallback = ev->Get()->Fallback;
+ const bool promote = ev->Get()->ReadOptions.CacheAfterRead;
+ const bool fallback = ev->Get()->ReadOptions.Fallback;
+ const bool isBackgroud = ev->Get()->ReadOptions.IsBackgroud;
LOG_S_DEBUG("Batch read request: " << JoinStrings(ranges.begin(), ranges.end(), " "));
for (const auto& blobRange : ranges) {
- HandleSingleRangeRead(blobRange, ev->Get()->CacheAfterRead, fallback, ev->Sender, ctx);
+ TReadItem readItem {
+ .BlobRange = blobRange,
+ .ReadClass = (isBackgroud ? NKikimrBlobStorage::AsyncRead : NKikimrBlobStorage::FastRead),
+ .PromoteInCache = promote,
+ .Fallback = fallback
+ };
+ HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx);
}
MakeReadRequests(ctx);
@@ -275,29 +295,32 @@ private:
CachedRanges.erase(blobIdIt);
}
- void EnqueueRead(const TBlobRange& blobRange, bool promoteInCache, const TActorId& sender, bool fallback) {
+ void EnqueueRead(TReadItem&& readItem, const TActorId& sender) {
+ const auto& blobRange = readItem.BlobRange;
TReadInfo& blobInfo = OutstandingReads[blobRange];
blobInfo.Waiting.push_back(sender);
- blobInfo.Cache = promoteInCache;
+ blobInfo.Cache = readItem.PromoteInCache;
LOG_S_DEBUG("Enqueue read range: " << blobRange);
- ReadQueue.push_back(TReadItem{blobRange, fallback});
+ ReadQueue.emplace_back(std::move(readItem));
ReadsInQueue->Set(ReadQueue.size());
}
- void SendBatchReadRequest(const std::vector<TBlobRange>& blobRanges, const ui64 cookie, const TActorContext& ctx) {
+ void SendBatchReadRequest(const std::vector<TBlobRange>& blobRanges,
+ NKikimrBlobStorage::EGetHandleClass readClass, const ui64 cookie, const TActorContext& ctx)
+ {
Y_VERIFY(!blobRanges.empty());
if (blobRanges.front().BlobId.IsSmallBlob()) {
SendBatchReadRequestToTablet(blobRanges, cookie, ctx);
} else {
- SendBatchReadRequestToDS(blobRanges, cookie, ctx);
+ SendBatchReadRequestToDS(blobRanges, readClass, cookie, ctx);
}
}
void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges,
- const ui64 cookie, const TActorContext& ctx)
+ NKikimrBlobStorage::EGetHandleClass readClass, const ui64 cookie, const TActorContext& ctx)
{
const ui32 dsGroup = blobRanges.front().BlobId.GetDsGroup();
@@ -311,18 +334,18 @@ private:
queires[i].Set(blobRanges[i].BlobId.GetLogoBlobId(), blobRanges[i].Offset, blobRanges[i].Size);
}
- const TInstant deadline = AppData(ctx)->TimeProvider->Now() + DEFAULT_READ_DEADLINE;
- const NKikimrBlobStorage::EGetHandleClass handleClass = NKikimrBlobStorage::FastRead;
+ const TInstant deadline = AppData(ctx)->TimeProvider->Now() +
+ ((readClass == NKikimrBlobStorage::FastRead) ? FAST_READ_DEADLINE : DEFAULT_READ_DEADLINE);
SendToBSProxy(ctx,
dsGroup,
- new TEvBlobStorage::TEvGet(queires, blobRanges.size(), deadline, handleClass, false),
+ new TEvBlobStorage::TEvGet(queires, blobRanges.size(), deadline, readClass, false),
cookie);
ReadRequests->Inc();
}
void MakeReadRequests(const TActorContext& ctx) {
- THashMap<std::pair<ui64, ui32>, std::vector<TBlobRange>> groupedBlobRanges;
+ THashMap<std::tuple<ui64, ui32, NKikimrBlobStorage::EGetHandleClass>, std::vector<TBlobRange>> groupedBlobRanges;
THashMap<TUnifiedBlobId, std::vector<TBlobRange>> fallbackRanges;
while (!ReadQueue.empty()) {
@@ -340,7 +363,7 @@ private:
}
InFlightDataSize += blobRange.Size;
- std::pair<ui64, ui32> blobSrc = BlobSource(blobRange.BlobId);
+ auto blobSrc = BlobSource(blobRange.BlobId, readItem.ReadClass);
groupedBlobRanges[blobSrc].push_back(blobRange);
}
@@ -359,10 +382,11 @@ private:
for (auto& [target, rangesGroup] : groupedBlobRanges) {
ui64 requestSize = 0;
+ NKikimrBlobStorage::EGetHandleClass readClass = std::get<2>(target);
for (auto& blobRange : rangesGroup) {
if (requestSize && (requestSize + blobRange.Size > MAX_REQUEST_BYTES)) {
- SendBatchReadRequest(CookieToRange[cookie], cookie, ctx);
+ SendBatchReadRequest(CookieToRange[cookie], readClass, cookie, ctx);
cookie = ++ReadCookie;
requestSize = 0;
}
@@ -371,7 +395,7 @@ private:
CookieToRange[cookie].emplace_back(std::move(blobRange));
}
if (requestSize) {
- SendBatchReadRequest(CookieToRange[cookie], cookie, ctx);
+ SendBatchReadRequest(CookieToRange[cookie], readClass, cookie, ctx);
cookie = ++ReadCookie;
requestSize = 0;
}
diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h
index afdcc146639..c7960af5564 100644
--- a/ydb/core/tx/columnshard/blob_cache.h
+++ b/ydb/core/tx/columnshard/blob_cache.h
@@ -20,6 +20,11 @@ using NOlap::TBlobRange;
using TLogThis = TCtorLogger<NKikimrServices::BLOB_CACHE>;
+struct TReadBlobRangeOptions {
+ bool CacheAfterRead;
+ bool Fallback;
+ bool IsBackgroud;
+};
struct TEvBlobCache {
enum EEv {
@@ -36,13 +41,11 @@ struct TEvBlobCache {
struct TEvReadBlobRange : public NActors::TEventLocal<TEvReadBlobRange, EvReadBlobRange> {
TBlobRange BlobRange;
- bool CacheAfterRead;
- bool Fallback;
+ TReadBlobRangeOptions ReadOptions;
- explicit TEvReadBlobRange(const TBlobRange& blobRange, bool cacheResult, bool fallback)
+ explicit TEvReadBlobRange(const TBlobRange& blobRange, TReadBlobRangeOptions&& opts)
: BlobRange(blobRange)
- , CacheAfterRead(cacheResult)
- , Fallback(fallback)
+ , ReadOptions(std::move(opts))
{}
};
@@ -50,15 +53,13 @@ struct TEvBlobCache {
// This is usefull to save IOPs when reading multiple columns from the same blob
struct TEvReadBlobRangeBatch : public NActors::TEventLocal<TEvReadBlobRangeBatch, EvReadBlobRangeBatch> {
std::vector<TBlobRange> BlobRanges;
- bool CacheAfterRead;
- bool Fallback;
+ TReadBlobRangeOptions ReadOptions;
- explicit TEvReadBlobRangeBatch(std::vector<TBlobRange>&& blobRanges, bool cacheResult, bool fallback)
+ explicit TEvReadBlobRangeBatch(std::vector<TBlobRange>&& blobRanges, TReadBlobRangeOptions&& opts)
: BlobRanges(std::move(blobRanges))
- , CacheAfterRead(cacheResult)
- , Fallback(fallback)
+ , ReadOptions(std::move(opts))
{
- if (fallback) {
+ if (opts.Fallback) {
for (const auto& blobRange : BlobRanges) {
Y_VERIFY(blobRange.BlobId == BlobRanges[0].BlobId);
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 60f6915758d..f932fb13f35 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -108,7 +108,13 @@ private:
auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs;
bool fallback = externBlobs && externBlobs->count(blobRange.BlobId);
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, true, fallback));
+
+ NBlobCache::TReadBlobRangeOptions readOpts {
+ .CacheAfterRead = true,
+ .Fallback = fallback,
+ .IsBackgroud = false
+ };
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
++InFlightReads;
InFlightReadBytes += blobRange.Size;
return true;
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index 8b5c25b3349..179f10a7b39 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -107,8 +107,13 @@ private:
void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) {
Y_VERIFY(!ranges.empty());
+ NBlobCache::TReadBlobRangeOptions readOpts {
+ .CacheAfterRead = false,
+ .Fallback = isExternal,
+ .IsBackgroud = true
+ };
Send(BlobCacheActorId,
- new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false, isExternal));
+ new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), std::move(readOpts)));
}
void CompactGranules(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index a7affb58703..e379a6b9189 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -104,8 +104,13 @@ private:
void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) {
Y_VERIFY(!ranges.empty());
+ NBlobCache::TReadBlobRangeOptions readOpts {
+ .CacheAfterRead = false,
+ .Fallback = isExternal,
+ .IsBackgroud = true
+ };
Send(BlobCacheActorId,
- new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false, isExternal));
+ new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), std::move(readOpts)));
}
void EvictPortions(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp
index acdb680d268..1ee860ff3a2 100644
--- a/ydb/core/tx/columnshard/export_actor.cpp
+++ b/ydb/core/tx/columnshard/export_actor.cpp
@@ -92,7 +92,13 @@ private:
void SendReadRequest(const NBlobCache::TBlobRange& blobRange) {
Y_VERIFY(!blobRange.Offset);
Y_VERIFY(blobRange.Size);
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false, false));
+
+ NBlobCache::TReadBlobRangeOptions readOpts {
+ .CacheAfterRead = false,
+ .Fallback = false,
+ .IsBackgroud = true
+ };
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
}
void SendResultAndDie(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index d385d008dce..a05272ce73c 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -108,7 +108,12 @@ private:
void SendReadRequest(const NBlobCache::TBlobRange& blobRange) {
Y_VERIFY(blobRange.Size);
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false, false));
+ NBlobCache::TReadBlobRangeOptions readOpts {
+ .CacheAfterRead = false,
+ .Fallback = false,
+ .IsBackgroud = true
+ };
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
}
void Index(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 2193632cad8..a4f05397d71 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -218,7 +218,13 @@ public:
auto& externBlobs = ReadMetadata->ExternBlobs;
bool fallback = externBlobs && externBlobs->count(blobRange.BlobId);
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, true, fallback));
+
+ NBlobCache::TReadBlobRangeOptions readOpts {
+ .CacheAfterRead = true,
+ .Fallback = fallback,
+ .IsBackgroud = false
+ };
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
}
STFUNC(StateWait) {