aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-13 16:49:25 +0300
committerchertus <azuikov@ydb.tech>2023-03-13 16:49:25 +0300
commita73b09fdf2c05e182c29a0456ebbcf925a05de55 (patch)
tree09cd0260c44c8b559c536d8779f4f427c50ba15f
parent4d8540eb0daa691e068e943d6b9ccd1efb80139c (diff)
downloadydb-a73b09fdf2c05e182c29a0456ebbcf925a05de55.tar.gz
fallback DS NODATA read to tablet in BlobCache
-rw-r--r--ydb/core/tx/columnshard/blob.h7
-rw-r--r--ydb/core/tx/columnshard/blob_cache.cpp192
-rw-r--r--ydb/core/tx/columnshard/blob_cache.h11
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp4
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h1
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h11
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/export_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/inflight_request_tracker.h7
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp10
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp162
17 files changed, 300 insertions, 137 deletions
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h
index 93feda49b8f..f9fb2b4a69d 100644
--- a/ydb/core/tx/columnshard/blob.h
+++ b/ydb/core/tx/columnshard/blob.h
@@ -347,6 +347,13 @@ struct TEvictedBlob {
bool IsExternal() const {
return ExternBlob.IsValid();
}
+
+ TString ToString() const {
+ return TStringBuilder() << "state: " << (ui32)State
+ << " blob: " << Blob.ToStringNew()
+ << " extern: " << ExternBlob.ToStringNew()
+ << " cached: " << CachedBlob.ToStringNew();
+ }
};
}
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp
index d798cfbbc0b..6e38d8ecb03 100644
--- a/ydb/core/tx/columnshard/blob_cache.cpp
+++ b/ydb/core/tx/columnshard/blob_cache.cpp
@@ -79,6 +79,7 @@ private:
};
static constexpr i64 MAX_IN_FLIGHT_BYTES = 250ll << 20;
+ static constexpr i64 MAX_IN_FLIGHT_FALLBACK_BYTES = 100ll << 20;
static constexpr i64 MAX_REQUEST_BYTES = 8ll << 20;
static constexpr TDuration DEFAULT_READ_DEADLINE = TDuration::Seconds(30);
static constexpr TDuration FAST_READ_DEADLINE = TDuration::Seconds(10);
@@ -88,8 +89,8 @@ private:
// It is used to remove all blob ranges from cache when
// it gets a notification that a blob has been deleted
TControlWrapper MaxCacheDataSize;
- TControlWrapper MaxCacheExternalDataSize; // Cache size for extern (i.e. S3) blobs
TControlWrapper MaxInFlightDataSize;
+ TControlWrapper MaxFallbackDataSize; // It's expected to be less then MaxInFlightDataSize
i64 CacheDataSize; // Current size of all blobs in cache
ui64 ReadCookie;
THashMap<ui64, std::vector<TBlobRange>> CookieToRange; // All in-flight requests
@@ -97,6 +98,7 @@ private:
TDeque<TReadItem> ReadQueue; // Reads that are waiting to be sent
// TODO: Consider making per-group queues
i64 InFlightDataSize; // Current size of all in-flight blobs
+ i64 FallbackDataSize; // Current size of in-flight fallback blobs
THashMap<ui64, TActorId> ShardPipes; // TabletId -> PipeClient for small blob read requests
THashMap<ui64, THashSet<ui64>> InFlightTabletRequests; // TabletId -> list to read cookies
@@ -129,11 +131,12 @@ public:
: TActorBootstrapped<TBlobCache>()
, Cache(SIZE_MAX)
, MaxCacheDataSize(maxSize, 0, 1ull << 40)
- , MaxCacheExternalDataSize(0, 0, 1ull << 40)
, MaxInFlightDataSize(Min<i64>(MaxCacheDataSize, MAX_IN_FLIGHT_BYTES), 0, 10ull << 30)
+ , MaxFallbackDataSize(Min<i64>(MaxCacheDataSize / 2, MAX_IN_FLIGHT_FALLBACK_BYTES), 0, 5ull << 30)
, CacheDataSize(0)
, ReadCookie(1)
, InFlightDataSize(0)
+ , FallbackDataSize(0)
, SizeBytes(counters->GetCounter("SizeBytes"))
, SizeBlobs(counters->GetCounter("SizeBlobs"))
, Hits(counters->GetCounter("Hits", true))
@@ -155,8 +158,13 @@ public:
void Bootstrap(const TActorContext& ctx) {
auto& icb = AppData(ctx)->Icb;
icb->RegisterSharedControl(MaxCacheDataSize, "BlobCache.MaxCacheDataSize");
- icb->RegisterSharedControl(MaxCacheExternalDataSize, "BlobCache.MaxCacheExternalDataSize");
icb->RegisterSharedControl(MaxInFlightDataSize, "BlobCache.MaxInFlightDataSize");
+ icb->RegisterSharedControl(MaxFallbackDataSize, "BlobCache.MaxFallbackDataSize");
+
+ LOG_S_DEBUG("MaxCacheDataSize: " << (i64)MaxCacheDataSize
+ << " MaxFallbackDataSize: " << (i64)MaxFallbackDataSize
+ << " InFlightDataSize: " << (i64)InFlightDataSize);
+
Become(&TBlobCache::StateFunc);
ScheduleWakeup();
}
@@ -199,8 +207,8 @@ private:
void Handle(TEvBlobCache::TEvReadBlobRange::TPtr& ev, const TActorContext& ctx) {
const TBlobRange& blobRange = ev->Get()->BlobRange;
- const bool promote = ev->Get()->ReadOptions.CacheAfterRead;
- const bool fallback = ev->Get()->ReadOptions.Fallback;
+ const bool promote = (i64)MaxCacheDataSize && ev->Get()->ReadOptions.CacheAfterRead;
+ const bool fallback = ev->Get()->ReadOptions.ForceFallback;
LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback);
@@ -223,10 +231,14 @@ private:
Misses->Inc();
- // Disable promoting reads for external blobs if MaxCacheExternalDataSize is zero. But keep promoting hits.
- // For now MaxCacheExternalDataSize is just a disabled/enabled flag. TODO: real MaxCacheExternalDataSize
- if (readItem.Fallback && MaxCacheExternalDataSize == 0) {
- readItem.CacheAfterRead = false;
+ // Prevent full cache flushing by exported blobs. Decrease propability of caching depending on cache size.
+ // TODO: better cache strategy
+ if (readItem.ForceFallback && readItem.CacheAfterRead) {
+ if (CacheDataSize > (MaxCacheDataSize / 4) * 3) {
+ readItem.CacheAfterRead = !(ReadCookie % 256);
+ } else if (CacheDataSize > (MaxCacheDataSize / 2)) {
+ readItem.CacheAfterRead = !(ReadCookie % 32);
+ }
}
// Is outstanding?
@@ -244,8 +256,11 @@ private:
const auto& ranges = ev->Get()->BlobRanges;
LOG_S_DEBUG("Batch read request: " << JoinStrings(ranges.begin(), ranges.end(), " "));
+ auto& readOptions = ev->Get()->ReadOptions;
+ readOptions.CacheAfterRead = (i64)MaxCacheDataSize && readOptions.CacheAfterRead;
+
for (const auto& blobRange : ranges) {
- TReadItem readItem(ev->Get()->ReadOptions, blobRange);
+ TReadItem readItem(readOptions, blobRange);
HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx);
}
@@ -318,23 +333,9 @@ private:
ReadsInQueue->Set(ReadQueue.size());
}
- void SendBatchReadRequest(const std::vector<TBlobRange>& blobRanges,
- TReadItem::EReadVariant readVariant, const ui64 cookie, const TActorContext& ctx)
+ void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges, const ui64 cookie,
+ ui32 dsGroup, TReadItem::EReadVariant readVariant, const TActorContext& ctx)
{
- Y_VERIFY(!blobRanges.empty());
-
- if (blobRanges.front().BlobId.IsSmallBlob()) {
- SendBatchReadRequestToTablet(blobRanges, cookie, ctx);
- } else {
- SendBatchReadRequestToDS(blobRanges, readVariant, cookie, ctx);
- }
- }
-
- void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges,
- TReadItem::EReadVariant readVariant, const ui64 cookie, const TActorContext& ctx)
- {
- const ui32 dsGroup = blobRanges.front().BlobId.GetDsGroup();
-
LOG_S_DEBUG("Sending read from DS: group: " << dsGroup
<< " ranges: " << JoinStrings(blobRanges.begin(), blobRanges.end(), " ")
<< " cookie: " << cookie);
@@ -364,32 +365,39 @@ private:
return TInstant::Max(); // EReadVariant::DEFAULT_NO_DEADLINE
}
- void MakeReadRequests(const TActorContext& ctx) {
+ void MakeReadRequests(const TActorContext& ctx, THashMap<TUnifiedBlobId, std::vector<TBlobRange>>&& fallbackRanges = {}) {
THashMap<std::tuple<ui64, ui32, TReadItem::EReadVariant>, std::vector<TBlobRange>> groupedBlobRanges;
- THashMap<TUnifiedBlobId, std::vector<TBlobRange>> fallbackRanges;
while (!ReadQueue.empty()) {
const auto& readItem = ReadQueue.front();
const TBlobRange& blobRange = readItem.BlobRange;
- if (readItem.Fallback) {
- // For now it's always possible to add external read cause we must not block ReadQueue by such reads
- // TODO: separate ReadQueue and InFlightDataSize for external reads
- fallbackRanges[blobRange.BlobId].push_back(blobRange);
- } else {
- // NOTE: if queue is not empty, at least 1 in-flight request is allowed
- if (InFlightDataSize && InFlightDataSize >= MaxInFlightDataSize) {
- break;
- }
- InFlightDataSize += blobRange.Size;
+ // NOTE: if queue is not empty, at least 1 in-flight request is allowed
+ if (InFlightDataSize && InFlightDataSize >= MaxInFlightDataSize) {
+ break;
+ }
+ InFlightDataSize += blobRange.Size;
+ SizeBytesInFlight->Add(blobRange.Size);
+ SizeBlobsInFlight->Inc();
+ if (readItem.ForceFallback) {
+ Y_VERIFY(blobRange.BlobId.IsDsBlob());
+
+ if (FallbackDataSize && FallbackDataSize >= MaxFallbackDataSize) {
+ // 1. Do not block DS reads by fallbacks (fallback reads form S3 could be much slower then DS ones)
+ // 2. Limit max fallback data in flight
+ // Requires MaxFallbackDataSize < MaxInFlightDataSize
+ ReadQueue.push_back(readItem);
+ } else {
+ // Tablet cannot read different blobs in fallback now. Group reads by blobId.
+ fallbackRanges[blobRange.BlobId].push_back(blobRange);
+ FallbackDataSize += blobRange.Size;
+ }
+ } else {
auto blobSrc = readItem.BlobSource();
groupedBlobRanges[blobSrc].push_back(blobRange);
}
- SizeBytesInFlight->Add(blobRange.Size);
- SizeBlobsInFlight->Inc();
-
ReadQueue.pop_front();
}
@@ -398,15 +406,35 @@ private:
// We might need to free some space to accomodate the results of new reads
Evict(ctx);
+ std::vector<ui64> tabletReads;
+ tabletReads.reserve(groupedBlobRanges.size() + fallbackRanges.size());
+
+ for (auto& [blobId, ranges] : fallbackRanges) {
+ Y_VERIFY(blobId.IsDsBlob());
+
+ ui64 cookie = ++ReadCookie;
+ CookieToRange[cookie] = std::move(ranges);
+ tabletReads.push_back(cookie);
+ }
+
ui64 cookie = ++ReadCookie;
+ // TODO: fix small blobs mix with dsGroup == 0 (it could be zero in tests)
for (auto& [target, rangesGroup] : groupedBlobRanges) {
ui64 requestSize = 0;
+ ui32 dsGroup = std::get<1>(target);
TReadItem::EReadVariant readVariant = std::get<2>(target);
+ bool isDS = rangesGroup.begin()->BlobId.IsDsBlob();
+
+ std::vector<ui64> dsReads;
for (auto& blobRange : rangesGroup) {
if (requestSize && (requestSize + blobRange.Size > MAX_REQUEST_BYTES)) {
- SendBatchReadRequest(CookieToRange[cookie], readVariant, cookie, ctx);
+ if (isDS) {
+ dsReads.push_back(cookie);
+ } else {
+ tabletReads.push_back(cookie);
+ }
cookie = ++ReadCookie;
requestSize = 0;
}
@@ -415,18 +443,21 @@ private:
CookieToRange[cookie].emplace_back(std::move(blobRange));
}
if (requestSize) {
- SendBatchReadRequest(CookieToRange[cookie], readVariant, cookie, ctx);
+ if (isDS) {
+ dsReads.push_back(cookie);
+ } else {
+ tabletReads.push_back(cookie);
+ }
cookie = ++ReadCookie;
requestSize = 0;
}
- }
- for (auto& [blobId, ranges] : fallbackRanges) {
- Y_VERIFY(blobId.IsDsBlob());
- Y_VERIFY(!ranges.empty());
+ for (ui64 cookie : dsReads) {
+ SendBatchReadRequestToDS(CookieToRange[cookie], cookie, dsGroup, readVariant, ctx);
+ }
+ }
- cookie = ++ReadCookie;
- CookieToRange[cookie] = std::move(ranges);
+ for (ui64 cookie : tabletReads) {
SendBatchReadRequestToTablet(CookieToRange[cookie], cookie, ctx);
}
}
@@ -461,12 +492,18 @@ private:
Y_VERIFY(blobRanges.size() == ev->Get()->ResponseSz, "Mismatched number of results for read request!");
+ // We could find blob ranges evicted (NODATA). Try to fallback them to tablet.
+ THashMap<TUnifiedBlobId, std::vector<TBlobRange>> fallbackRanges;
for (size_t i = 0; i < ev->Get()->ResponseSz; ++i) {
const auto& res = ev->Get()->Responses[i];
- ProcessSingleRangeResult(blobRanges[i], readCookie, res.Status, res.Buffer, ctx);
+ if (res.Status == NKikimrProto::EReplyStatus::NODATA) {
+ fallbackRanges[blobRanges[i].BlobId].emplace_back(std::move(blobRanges[i]));
+ } else {
+ ProcessSingleRangeResult(blobRanges[i], readCookie, res.Status, res.Buffer, ctx);
+ }
}
- MakeReadRequests(ctx);
+ MakeReadRequests(ctx, std::move(fallbackRanges));
}
void ProcessSingleRangeResult(const TBlobRange& blobRange, const ui64 readCookie,
@@ -532,6 +569,8 @@ private:
InFlightTabletRequests[tabletId].insert(cookie);
NTabletPipe::SendData(ctx, ShardPipes[tabletId], ev.release(), cookie);
+
+ ReadRequests->Inc();
}
// Frogets the pipe to the tablet and fails all in-flight requests to it
@@ -581,12 +620,11 @@ private:
}
void Handle(TEvColumnShard::TEvReadBlobRangesResult::TPtr& ev, const TActorContext& ctx) {
- ui64 tabletId = ev->Get()->Record.GetTabletId();
+ const auto& record = ev->Get()->Record;
+ ui64 tabletId = record.GetTabletId();
ui64 readCookie = ev->Cookie;
LOG_S_DEBUG("Got read result from tablet: " << tabletId);
- InFlightTabletRequests[tabletId].erase(readCookie);
-
auto cookieIt = CookieToRange.find(readCookie);
if (cookieIt == CookieToRange.end()) {
// This might only happen in case fo race between response and pipe close
@@ -595,19 +633,45 @@ private:
}
std::vector<TBlobRange> blobRanges = std::move(cookieIt->second);
- CookieToRange.erase(readCookie);
- const auto& record = ev->Get()->Record;
+ Y_VERIFY(record.ResultsSize(), "Zero results for read request!");
+ Y_VERIFY(blobRanges.size() >= record.ResultsSize(), "Mismatched number of results for read request");
- Y_VERIFY(blobRanges.size() == record.ResultsSize(), "Mismatched number of results for read request!");
+ if (blobRanges.size() == record.ResultsSize()) {
+ InFlightTabletRequests[tabletId].erase(readCookie);
+ CookieToRange.erase(readCookie);
+ } else {
+ // Extract blobRanges for returned blobId. Keep others ordered.
+ TString strReturnedBlobId = record.GetResults(0).GetBlobRange().GetBlobId();
+ std::vector<TBlobRange> same;
+ std::vector<TBlobRange> others;
+ same.reserve(record.ResultsSize());
+ others.reserve(blobRanges.size() - record.ResultsSize());
+
+ for (auto&& blobRange : blobRanges) {
+ TString strBlobId = blobRange.BlobId.ToStringNew();
+ if (strBlobId == strReturnedBlobId) {
+ same.emplace_back(std::move(blobRange));
+ } else {
+ others.emplace_back(std::move(blobRange));
+ }
+ }
+ blobRanges.swap(same);
+
+ CookieToRange[readCookie] = std::move(others);
+ }
for (size_t i = 0; i < record.ResultsSize(); ++i) {
const auto& res = record.GetResults(i);
+ const auto& blobRange = blobRanges[i];
+ if (!blobRange.BlobId.IsSmallBlob()) {
+ FallbackDataSize -= blobRange.Size;
+ }
- Y_VERIFY(blobRanges[i].BlobId.ToStringNew() == res.GetBlobRange().GetBlobId());
- Y_VERIFY(blobRanges[i].Offset == res.GetBlobRange().GetOffset());
- Y_VERIFY(blobRanges[i].Size == res.GetBlobRange().GetSize());
- ProcessSingleRangeResult(blobRanges[i], readCookie, res.GetStatus(), res.GetData(), ctx);
+ Y_VERIFY(blobRange.BlobId.ToStringNew() == res.GetBlobRange().GetBlobId());
+ Y_VERIFY(blobRange.Offset == res.GetBlobRange().GetOffset());
+ Y_VERIFY(blobRange.Size == res.GetBlobRange().GetSize());
+ ProcessSingleRangeResult(blobRange, readCookie, res.GetStatus(), res.GetData(), ctx);
}
MakeReadRequests(ctx);
@@ -634,7 +698,11 @@ private:
break;
}
- LOG_S_DEBUG("Evict: " << it.Key() << ";CacheDataSize:" << CacheDataSize << ";InFlightDataSize:" << (i64)InFlightDataSize << ";MaxCacheDataSize:" << (i64)MaxCacheDataSize);
+ LOG_S_DEBUG("Evict: " << it.Key()
+ << " CacheDataSize: " << CacheDataSize
+ << " InFlightDataSize: " << (i64)InFlightDataSize
+ << " MaxCacheDataSize: " << (i64)MaxCacheDataSize
+ << " MaxFallbackDataSize: " << (i64)MaxFallbackDataSize);
{
// Remove the range from list of ranges by blob id
diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h
index 38d68f962e2..fb6ab9db2fd 100644
--- a/ydb/core/tx/columnshard/blob_cache.h
+++ b/ydb/core/tx/columnshard/blob_cache.h
@@ -22,9 +22,16 @@ using TLogThis = TCtorLogger<NKikimrServices::BLOB_CACHE>;
struct TReadBlobRangeOptions {
bool CacheAfterRead;
- bool Fallback;
+ bool ForceFallback;
bool IsBackgroud;
bool WithDeadline = true;
+
+ TString ToString() const {
+ return TStringBuilder() << "cache: " << (ui32)CacheAfterRead
+ << " fallback: " << (ui32)ForceFallback
+ << " background: " << (ui32)IsBackgroud
+ << " dedlined: " << (ui32)WithDeadline;
+ }
};
struct TEvBlobCache {
@@ -60,7 +67,7 @@ struct TEvBlobCache {
: BlobRanges(std::move(blobRanges))
, ReadOptions(std::move(opts))
{
- if (opts.Fallback) {
+ if (opts.ForceFallback) {
for (const auto& blobRange : BlobRanges) {
Y_VERIFY(blobRange.BlobId == BlobRanges[0].BlobId);
}
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index fb3c6c5e3f3..3f3cfaa47d0 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -674,6 +674,10 @@ void TBlobManager::PerformDelayedDeletes(IBlobManagerDb& db) {
SmallBlobsToDelete.clear();
}
+bool TBlobManager::BlobInUse(const NOlap::TUnifiedBlobId& blobId) const {
+ return BlobsUseCount.count(blobId);
+}
+
void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) {
if (inUse) {
BlobsUseCount[blobId]++;
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index c0dfc48b35d..415590d7aa2 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -249,6 +249,7 @@ public:
// Implementation of IBlobInUseTracker
void SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) override;
+ bool BlobInUse(const NOlap::TUnifiedBlobId& blobId) const override;
private:
TGenStep FindNewGCBarrier();
diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
index 5f9e0615457..9eef2df27b4 100644
--- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
@@ -169,21 +169,25 @@ void TColumnShard::Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TAc
} else if (isFallback) {
Y_VERIFY(evictedBlobId->IsValid());
- ui32 status = NKikimrProto::EReplyStatus::ERROR;
-
NKikimrTxColumnShard::TEvictMetadata meta;
auto evicted = BlobManager->GetEvicted(*evictedBlobId, meta);
+ if (!evicted.Blob.IsValid()) {
+ evicted = BlobManager->GetDropped(*evictedBlobId, meta);
+ }
+
if (!evicted.Blob.IsValid() || !evicted.ExternBlob.IsValid()) {
- auto result = MakeErrorResponse(msg, TabletID(), status);
+ LOG_S_NOTICE("No data for blobId " << evictedBlobId->ToStringNew() << " at tablet " << TabletID());
+ auto result = MakeErrorResponse(msg, TabletID(), NKikimrProto::EReplyStatus::NODATA);
ctx.Send(ev->Sender, result.release(), 0, ev->Cookie);
+ return;
}
TString tierName = meta.GetTierName();
- Y_VERIFY(!tierName.empty());
+ Y_VERIFY_S(!tierName.empty(), evicted.ToString());
if (!GetExportedBlob(ctx, ev->Sender, ev->Cookie, tierName, std::move(evicted), std::move(msg.BlobRanges))) {
- auto result = MakeErrorResponse(msg, TabletID(), status);
+ auto result = MakeErrorResponse(msg, TabletID(), NKikimrProto::EReplyStatus::ERROR);
ctx.Send(ev->Sender, result.release(), 0, ev->Cookie);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 706594ad1a4..0eeae1a22c1 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -111,7 +111,7 @@ private:
NBlobCache::TReadBlobRangeOptions readOpts {
.CacheAfterRead = true,
- .Fallback = fallback,
+ .ForceFallback = fallback,
.IsBackgroud = false
};
Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 47e6990c39b..1c0c1c936ac 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -1077,6 +1077,12 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const THashSet<NOlap::T
for (const auto& ev : evictedBlobs) {
auto& blobId = ev.Blob;
+ if (BlobManager->BlobInUse(blobId)) {
+ LOG_S_DEBUG("Blob '" << blobId.ToStringNew() << "' in use at tablet " << TabletID());
+ strBlobsDelayed += "'" + blobId.ToStringNew() + "' ";
+ continue;
+ }
+
TEvictMetadata meta;
auto evict = BlobManager->GetDropped(blobId, meta);
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index c557a367684..5748ca84064 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -2,6 +2,7 @@
#include "columnshard.h"
#include "columnshard_impl.h"
+#include "blob_cache.h"
#include <ydb/core/formats/arrow_batch_builder.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
@@ -13,6 +14,16 @@
namespace NKikimr::NTxUT {
+// Private events of different actors reuse the same ES_PRIVATE range
+// So in order to capture the right private event we need to check its type via dynamic_cast
+template <class TPrivateEvent>
+inline TPrivateEvent* TryGetPrivateEvent(TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() != TPrivateEvent::EventType) {
+ return nullptr;
+ }
+ return dynamic_cast<TPrivateEvent*>(ev->StaticCastAsLocal<IEventBase>());
+}
+
class TTester : public TNonCopyable {
public:
static constexpr const ui64 FAKE_SCHEMESHARD_TABLET_ID = 4200;
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index 1b5d26ff400..66028849159 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -109,7 +109,7 @@ private:
NBlobCache::TReadBlobRangeOptions readOpts {
.CacheAfterRead = false,
- .Fallback = isExternal,
+ .ForceFallback = isExternal,
.IsBackgroud = true
};
Send(BlobCacheActorId,
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index e379a6b9189..39f116d73b2 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -106,7 +106,7 @@ private:
NBlobCache::TReadBlobRangeOptions readOpts {
.CacheAfterRead = false,
- .Fallback = isExternal,
+ .ForceFallback = isExternal,
.IsBackgroud = true
};
Send(BlobCacheActorId,
diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp
index 5b9cfca6bbb..8dff625bf1d 100644
--- a/ydb/core/tx/columnshard/export_actor.cpp
+++ b/ydb/core/tx/columnshard/export_actor.cpp
@@ -95,7 +95,7 @@ private:
NBlobCache::TReadBlobRangeOptions readOpts {
.CacheAfterRead = false,
- .Fallback = false,
+ .ForceFallback = false,
.IsBackgroud = true
};
Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index 35f9b7cb314..5412ba3633c 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -110,7 +110,7 @@ private:
Y_VERIFY(blobRange.Size);
NBlobCache::TReadBlobRangeOptions readOpts {
.CacheAfterRead = false,
- .Fallback = false,
+ .ForceFallback = false,
.IsBackgroud = true,
.WithDeadline = false
};
diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h
index cdec4279bc4..72beea01593 100644
--- a/ydb/core/tx/columnshard/inflight_request_tracker.h
+++ b/ydb/core/tx/columnshard/inflight_request_tracker.h
@@ -15,6 +15,7 @@ public:
// until all the references are released.
// NOTE: this ref counts are in-memory only, so the blobs can be deleted if tablet restarts
virtual void SetBlobInUse(const NOlap::TUnifiedBlobId& blobId, bool inUse) = 0;
+ virtual bool BlobInUse(const NOlap::TUnifiedBlobId& blobId) const = 0;
};
using NOlap::TReadMetadata;
@@ -59,6 +60,9 @@ public:
} else {
it->second--;
}
+ for (auto& rec : portion.Records) {
+ blobTracker.SetBlobInUse(rec.BlobRange.BlobId, false);
+ }
}
for (const auto& committedBlob : readMeta->CommittedBlobs) {
@@ -97,6 +101,9 @@ private:
for (const auto& portion : readMeta->SelectInfo->Portions) {
const ui64 portionId = portion.Records[0].Portion;
PortionUseCount[portionId]++;
+ for (auto& rec : portion.Records) {
+ blobTracker.SetBlobInUse(rec.BlobRange.BlobId, true);
+ }
}
for (const auto& committedBlob : readMeta->CommittedBlobs) {
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index a4f05397d71..69e8dc37c20 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -221,7 +221,7 @@ public:
NBlobCache::TReadBlobRangeOptions readOpts {
.CacheAfterRead = true,
- .Fallback = fallback,
+ .ForceFallback = fallback,
.IsBackgroud = false
};
Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 0771ed4bf6a..af33c4f1a84 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2250,16 +2250,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
}
- // Private events of different actors reuse the same ES_PRIVATE range
- // So in order to capture the right private event we need to check its type via dynamic_cast
- template <class TPrivateEvent>
- TPrivateEvent* TryGetPrivateEvent(TAutoPtr<IEventHandle> &ev) {
- if (ev->GetTypeRewrite() != TPrivateEvent::EventType) {
- return nullptr;
- }
- return dynamic_cast<TPrivateEvent*>(ev->StaticCastAsLocal<IEventBase>());
- }
-
void TestCompactionGC(bool enableSmallBlobs) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index a3426017684..a5847ac6772 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -168,6 +168,22 @@ bool TestCreateTable(const TString& txBody, ui64 planStep = 1000, ui64 txId = 10
return ProposeSchemaTx(runtime, sender, txBody, {++planStep, ++txId});
}
+TString GetReadResult(NKikimrTxColumnShard::TEvReadResult& resRead,
+ std::optional<ui32> batchNo = 0,
+ std::optional<bool> finished = true)
+{
+ UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
+ UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), TTestTxConfig::TxTablet1);
+ UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ if (batchNo) {
+ UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), *batchNo);
+ }
+ if (finished) {
+ UNIT_ASSERT_EQUAL(resRead.GetFinished(), *finished);
+ }
+ return resRead.GetData();
+}
+
static constexpr ui32 PORTION_ROWS = 80 * 1000;
// ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020
@@ -265,15 +281,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT(event);
auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT(resRead.GetData().size() > 0);
+ TString data = GetReadResult(resRead);
+ UNIT_ASSERT(data.size() > 0);
auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(resRead.GetData(), schema, PORTION_ROWS, spec.TtlColumn, ts[1]));
+ UNIT_ASSERT(CheckSame(data, schema, PORTION_ROWS, spec.TtlColumn, ts[1]));
}
// Alter TTL
@@ -308,12 +320,8 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT(event);
auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetData().size(), 0);
+ TString data = GetReadResult(resRead);
+ UNIT_ASSERT_VALUES_EQUAL(data.size(), 0);
}
// Disable TTL
@@ -366,6 +374,8 @@ public:
ui32 SuccessCounter = 0;
ui32 ErrorsCounter = 0;
ui32 ResponsesCounter = 0;
+ ui32 CaptureReadEvents = 0;
+ std::vector<TAutoPtr<IEventHandle>> CapturedReads;
TString SerializeToString() const {
TStringBuilder sb;
@@ -399,47 +409,60 @@ public:
Cerr << "FINISH_WAITING(" << attemption << "): " << SerializeToString() << Endl;
SuccessCounterStart = SuccessCounter;
}
+
+ void WaitReadsCaptured(TTestBasicRuntime& runtime) const {
+ const TInstant startInstant = TAppData::TimeProvider->Now();
+ const TInstant deadline = startInstant + TDuration::Seconds(10);
+ while (CaptureReadEvents && TAppData::TimeProvider->Now() < deadline) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ UNIT_ASSERT_VALUES_EQUAL(CaptureReadEvents, 0);
+ }
+
+ void ResendCapturedReads(TTestBasicRuntime& runtime) {
+ for (auto& cev : CapturedReads) {
+ auto* msg = TryGetPrivateEvent<NBlobCache::TEvBlobCache::TEvReadBlobRange>(cev);
+ UNIT_ASSERT(msg);
+ Cerr << "RESEND " << msg->BlobRange.ToString() << " "
+ << msg->ReadOptions.ToString() << Endl;
+ runtime.Send(cev.Release());
+ }
+ CapturedReads.clear();
+ }
};
class TEventsCounter {
private:
TCountersContainer* Counters = nullptr;
TTestBasicRuntime& Runtime;
- const TActorId Sender;
-
- template <class TPrivateEvent>
- static TPrivateEvent* TryGetPrivateEvent(TAutoPtr<IEventHandle>& ev) {
- return ev->CastAsLocal<TPrivateEvent>();
- }
public:
- TEventsCounter(TCountersContainer& counters, TTestBasicRuntime& runtime, const TActorId sender)
+ TEventsCounter(TCountersContainer& counters, TTestBasicRuntime& runtime)
: Counters(&counters)
, Runtime(runtime)
- , Sender(sender)
{
Y_UNUSED(Runtime);
- Y_UNUSED(Sender);
}
bool operator()(TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
TStringBuilder ss;
if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvExport>(ev)) {
- ss << "EXPORT";
- if (msg->Status == NKikimrProto::OK) {
- ss << "(" << ++Counters->SuccessCounter << "): SUCCESS";
- }
- if (msg->Status == NKikimrProto::ERROR) {
- ss << "(" << ++Counters->ErrorsCounter << "): ERROR";
- }
- if (msg->Status == NKikimrProto::UNKNOWN) {
- ss << "(" << ++Counters->UnknownsCounter << "): UNKNOWN";
- }
+ ss << "EXPORT(" << ++Counters->SuccessCounter << "): " << NKikimrProto::EReplyStatus_Name(msg->Status);
} else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectResponse>(ev)) {
ss << "S3_RESPONSE(put " << ++Counters->ResponsesCounter << "):";
} else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvDeleteObjectResponse>(ev)) {
ss << "(" << ++Counters->SuccessCounter << "): DELETE SUCCESS";
ss << "S3_RESPONSE(delete " << ++Counters->ResponsesCounter << "):";
+ } else if (auto* msg = TryGetPrivateEvent<NBlobCache::TEvBlobCache::TEvReadBlobRange>(ev)) {
+ if (Counters->CaptureReadEvents) {
+ Cerr << "CAPTURE " << msg->BlobRange.ToString() << " "
+ << msg->ReadOptions.ToString() << Endl;
+ --Counters->CaptureReadEvents;
+ Counters->CapturedReads.push_back(ev.Release());
+ return true;
+ } else {
+ return false;
+ }
} else {
return false;
}
@@ -467,6 +490,18 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
+ // Disable blob cache. It hides evict-delete, evict-read races.
+ {
+ TAtomic unused;
+ runtime.GetAppData().Icb->SetValue("BlobCache.MaxCacheDataSize", 0, unused);
+ }
+
+ // Disable GC batching so that deleted blobs get collected without a delay
+ {
+ TAtomic unused;
+ runtime.GetAppData().Icb->SetValue("ColumnShardControls.BlobCountToTriggerGC", 1, unused);
+ }
+
//
ui64 metaShard = TTestTxConfig::TxTablet1;
@@ -503,7 +538,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
specRowsBytes.reserve(specs.size());
TCountersContainer counter;
- runtime.SetEventFilter(TEventsCounter(counter, runtime, sender));
+ runtime.SetEventFilter(TEventsCounter(counter, runtime));
for (ui32 i = 0; i < specs.size(); ++i) {
bool hasColdEviction = false;
for (auto&& i : specs[i].Tiers) {
@@ -526,44 +561,71 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
}
+ // Read crossed with eviction (start)
+ {
+ auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep-1, Max<ui64>(), tableId);
+ Proto(read.get()).AddColumnNames(specs[i].TtlColumn);
+
+ counter.CaptureReadEvents = 1; // TODO: we need affected by tiering blob here
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
+ counter.WaitReadsCaptured(runtime);
+ }
+
+ // Eviction
+
TriggerTTL(runtime, sender, { ++planStep, ++txId }, {}, 0, specs[i].TtlColumn);
+
+ Cerr << (hasColdEviction ? "Cold" : "Hot")
+ << " tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n";
+
if (hasColdEviction) {
- Cerr << "Cold tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n";
if (i > initialEviction) {
counter.WaitEvents(runtime, i, 1, TDuration::Seconds(40));
} else {
counter.WaitEvents(runtime, i, 0, TDuration::Seconds(20));
}
} else {
- Cerr << "Hot tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n";
counter.WaitEvents(runtime, i, 0, TDuration::Seconds(4));
}
if (reboots) {
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
}
- // Read
+ // Read crossed with eviction (finish)
+ {
+ counter.ResendCapturedReads(runtime);
+ ui32 numBatches = 0;
+ THashSet<ui32> batchNumbers;
+ while (!numBatches || numBatches < batchNumbers.size()) {
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& resRead = Proto(event);
+ TString data = GetReadResult(resRead, {}, {});
+ batchNumbers.insert(resRead.GetBatch());
+ if (resRead.GetFinished()) {
+ numBatches = resRead.GetBatch() + 1;
+ }
+ }
+ }
+
+ // Read data after eviction
- --planStep;
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
+ auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep-1, Max<ui64>(), tableId);
Proto(read.get()).AddColumnNames(specs[i].TtlColumn);
-
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
+
specRowsBytes.emplace_back(0, 0);
- ui32 idx = 0;
while (true) {
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
UNIT_ASSERT(event);
auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), idx++);
-
- if (!resRead.GetData().size()) {
+ TString data = GetReadResult(resRead, {}, {});
+ if (!data.size()) {
break;
}
+
auto& meta = resRead.GetMeta();
auto& schema = meta.GetSchema();
auto ttlColumn = DeserializeColumn(resRead.GetData(), schema, specs[i].TtlColumn);
@@ -841,12 +903,8 @@ void TestDrop(bool reboots) {
UNIT_ASSERT(event);
auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT_EQUAL(resRead.GetData().size(), 0);
+ TString data = GetReadResult(resRead);
+ UNIT_ASSERT_EQUAL(data.size(), 0);
}
}