diff options
author | chertus <azuikov@ydb.tech> | 2023-03-13 16:49:25 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-13 16:49:25 +0300 |
commit | a73b09fdf2c05e182c29a0456ebbcf925a05de55 (patch) | |
tree | 09cd0260c44c8b559c536d8779f4f427c50ba15f | |
parent | 4d8540eb0daa691e068e943d6b9ccd1efb80139c (diff) | |
download | ydb-a73b09fdf2c05e182c29a0456ebbcf925a05de55.tar.gz |
fallback DS NODATA read to tablet in BlobCache
-rw-r--r-- | ydb/core/tx/columnshard/blob.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_cache.cpp | 192 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_cache.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/compaction_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/eviction_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/export_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/indexing_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/inflight_request_tracker.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/read_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp | 162 |
17 files changed, 300 insertions, 137 deletions
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h index 93feda49b8f..f9fb2b4a69d 100644 --- a/ydb/core/tx/columnshard/blob.h +++ b/ydb/core/tx/columnshard/blob.h @@ -347,6 +347,13 @@ struct TEvictedBlob { bool IsExternal() const { return ExternBlob.IsValid(); } + + TString ToString() const { + return TStringBuilder() << "state: " << (ui32)State + << " blob: " << Blob.ToStringNew() + << " extern: " << ExternBlob.ToStringNew() + << " cached: " << CachedBlob.ToStringNew(); + } }; } diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp index d798cfbbc0b..6e38d8ecb03 100644 --- a/ydb/core/tx/columnshard/blob_cache.cpp +++ b/ydb/core/tx/columnshard/blob_cache.cpp @@ -79,6 +79,7 @@ private: }; static constexpr i64 MAX_IN_FLIGHT_BYTES = 250ll << 20; + static constexpr i64 MAX_IN_FLIGHT_FALLBACK_BYTES = 100ll << 20; static constexpr i64 MAX_REQUEST_BYTES = 8ll << 20; static constexpr TDuration DEFAULT_READ_DEADLINE = TDuration::Seconds(30); static constexpr TDuration FAST_READ_DEADLINE = TDuration::Seconds(10); @@ -88,8 +89,8 @@ private: // It is used to remove all blob ranges from cache when // it gets a notification that a blob has been deleted TControlWrapper MaxCacheDataSize; - TControlWrapper MaxCacheExternalDataSize; // Cache size for extern (i.e. S3) blobs TControlWrapper MaxInFlightDataSize; + TControlWrapper MaxFallbackDataSize; // It's expected to be less then MaxInFlightDataSize i64 CacheDataSize; // Current size of all blobs in cache ui64 ReadCookie; THashMap<ui64, std::vector<TBlobRange>> CookieToRange; // All in-flight requests @@ -97,6 +98,7 @@ private: TDeque<TReadItem> ReadQueue; // Reads that are waiting to be sent // TODO: Consider making per-group queues i64 InFlightDataSize; // Current size of all in-flight blobs + i64 FallbackDataSize; // Current size of in-flight fallback blobs THashMap<ui64, TActorId> ShardPipes; // TabletId -> PipeClient for small blob read requests THashMap<ui64, THashSet<ui64>> InFlightTabletRequests; // TabletId -> list to read cookies @@ -129,11 +131,12 @@ public: : TActorBootstrapped<TBlobCache>() , Cache(SIZE_MAX) , MaxCacheDataSize(maxSize, 0, 1ull << 40) - , MaxCacheExternalDataSize(0, 0, 1ull << 40) , MaxInFlightDataSize(Min<i64>(MaxCacheDataSize, MAX_IN_FLIGHT_BYTES), 0, 10ull << 30) + , MaxFallbackDataSize(Min<i64>(MaxCacheDataSize / 2, MAX_IN_FLIGHT_FALLBACK_BYTES), 0, 5ull << 30) , CacheDataSize(0) , ReadCookie(1) , InFlightDataSize(0) + , FallbackDataSize(0) , SizeBytes(counters->GetCounter("SizeBytes")) , SizeBlobs(counters->GetCounter("SizeBlobs")) , Hits(counters->GetCounter("Hits", true)) @@ -155,8 +158,13 @@ public: void Bootstrap(const TActorContext& ctx) { auto& icb = AppData(ctx)->Icb; icb->RegisterSharedControl(MaxCacheDataSize, "BlobCache.MaxCacheDataSize"); - icb->RegisterSharedControl(MaxCacheExternalDataSize, "BlobCache.MaxCacheExternalDataSize"); icb->RegisterSharedControl(MaxInFlightDataSize, "BlobCache.MaxInFlightDataSize"); + icb->RegisterSharedControl(MaxFallbackDataSize, "BlobCache.MaxFallbackDataSize"); + + LOG_S_DEBUG("MaxCacheDataSize: " << (i64)MaxCacheDataSize + << " MaxFallbackDataSize: " << (i64)MaxFallbackDataSize + << " InFlightDataSize: " << (i64)InFlightDataSize); + Become(&TBlobCache::StateFunc); ScheduleWakeup(); } @@ -199,8 +207,8 @@ private: void Handle(TEvBlobCache::TEvReadBlobRange::TPtr& ev, const TActorContext& ctx) { const TBlobRange& blobRange = ev->Get()->BlobRange; - const bool promote = ev->Get()->ReadOptions.CacheAfterRead; - const bool fallback = ev->Get()->ReadOptions.Fallback; + const bool promote = (i64)MaxCacheDataSize && ev->Get()->ReadOptions.CacheAfterRead; + const bool fallback = ev->Get()->ReadOptions.ForceFallback; LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback); @@ -223,10 +231,14 @@ private: Misses->Inc(); - // Disable promoting reads for external blobs if MaxCacheExternalDataSize is zero. But keep promoting hits. - // For now MaxCacheExternalDataSize is just a disabled/enabled flag. TODO: real MaxCacheExternalDataSize - if (readItem.Fallback && MaxCacheExternalDataSize == 0) { - readItem.CacheAfterRead = false; + // Prevent full cache flushing by exported blobs. Decrease propability of caching depending on cache size. + // TODO: better cache strategy + if (readItem.ForceFallback && readItem.CacheAfterRead) { + if (CacheDataSize > (MaxCacheDataSize / 4) * 3) { + readItem.CacheAfterRead = !(ReadCookie % 256); + } else if (CacheDataSize > (MaxCacheDataSize / 2)) { + readItem.CacheAfterRead = !(ReadCookie % 32); + } } // Is outstanding? @@ -244,8 +256,11 @@ private: const auto& ranges = ev->Get()->BlobRanges; LOG_S_DEBUG("Batch read request: " << JoinStrings(ranges.begin(), ranges.end(), " ")); + auto& readOptions = ev->Get()->ReadOptions; + readOptions.CacheAfterRead = (i64)MaxCacheDataSize && readOptions.CacheAfterRead; + for (const auto& blobRange : ranges) { - TReadItem readItem(ev->Get()->ReadOptions, blobRange); + TReadItem readItem(readOptions, blobRange); HandleSingleRangeRead(std::move(readItem), ev->Sender, ctx); } @@ -318,23 +333,9 @@ private: ReadsInQueue->Set(ReadQueue.size()); } - void SendBatchReadRequest(const std::vector<TBlobRange>& blobRanges, - TReadItem::EReadVariant readVariant, const ui64 cookie, const TActorContext& ctx) + void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges, const ui64 cookie, + ui32 dsGroup, TReadItem::EReadVariant readVariant, const TActorContext& ctx) { - Y_VERIFY(!blobRanges.empty()); - - if (blobRanges.front().BlobId.IsSmallBlob()) { - SendBatchReadRequestToTablet(blobRanges, cookie, ctx); - } else { - SendBatchReadRequestToDS(blobRanges, readVariant, cookie, ctx); - } - } - - void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges, - TReadItem::EReadVariant readVariant, const ui64 cookie, const TActorContext& ctx) - { - const ui32 dsGroup = blobRanges.front().BlobId.GetDsGroup(); - LOG_S_DEBUG("Sending read from DS: group: " << dsGroup << " ranges: " << JoinStrings(blobRanges.begin(), blobRanges.end(), " ") << " cookie: " << cookie); @@ -364,32 +365,39 @@ private: return TInstant::Max(); // EReadVariant::DEFAULT_NO_DEADLINE } - void MakeReadRequests(const TActorContext& ctx) { + void MakeReadRequests(const TActorContext& ctx, THashMap<TUnifiedBlobId, std::vector<TBlobRange>>&& fallbackRanges = {}) { THashMap<std::tuple<ui64, ui32, TReadItem::EReadVariant>, std::vector<TBlobRange>> groupedBlobRanges; - THashMap<TUnifiedBlobId, std::vector<TBlobRange>> fallbackRanges; while (!ReadQueue.empty()) { const auto& readItem = ReadQueue.front(); const TBlobRange& blobRange = readItem.BlobRange; - if (readItem.Fallback) { - // For now it's always possible to add external read cause we must not block ReadQueue by such reads - // TODO: separate ReadQueue and InFlightDataSize for external reads - fallbackRanges[blobRange.BlobId].push_back(blobRange); - } else { - // NOTE: if queue is not empty, at least 1 in-flight request is allowed - if (InFlightDataSize && InFlightDataSize >= MaxInFlightDataSize) { - break; - } - InFlightDataSize += blobRange.Size; + // NOTE: if queue is not empty, at least 1 in-flight request is allowed + if (InFlightDataSize && InFlightDataSize >= MaxInFlightDataSize) { + break; + } + InFlightDataSize += blobRange.Size; + SizeBytesInFlight->Add(blobRange.Size); + SizeBlobsInFlight->Inc(); + if (readItem.ForceFallback) { + Y_VERIFY(blobRange.BlobId.IsDsBlob()); + + if (FallbackDataSize && FallbackDataSize >= MaxFallbackDataSize) { + // 1. Do not block DS reads by fallbacks (fallback reads form S3 could be much slower then DS ones) + // 2. Limit max fallback data in flight + // Requires MaxFallbackDataSize < MaxInFlightDataSize + ReadQueue.push_back(readItem); + } else { + // Tablet cannot read different blobs in fallback now. Group reads by blobId. + fallbackRanges[blobRange.BlobId].push_back(blobRange); + FallbackDataSize += blobRange.Size; + } + } else { auto blobSrc = readItem.BlobSource(); groupedBlobRanges[blobSrc].push_back(blobRange); } - SizeBytesInFlight->Add(blobRange.Size); - SizeBlobsInFlight->Inc(); - ReadQueue.pop_front(); } @@ -398,15 +406,35 @@ private: // We might need to free some space to accomodate the results of new reads Evict(ctx); + std::vector<ui64> tabletReads; + tabletReads.reserve(groupedBlobRanges.size() + fallbackRanges.size()); + + for (auto& [blobId, ranges] : fallbackRanges) { + Y_VERIFY(blobId.IsDsBlob()); + + ui64 cookie = ++ReadCookie; + CookieToRange[cookie] = std::move(ranges); + tabletReads.push_back(cookie); + } + ui64 cookie = ++ReadCookie; + // TODO: fix small blobs mix with dsGroup == 0 (it could be zero in tests) for (auto& [target, rangesGroup] : groupedBlobRanges) { ui64 requestSize = 0; + ui32 dsGroup = std::get<1>(target); TReadItem::EReadVariant readVariant = std::get<2>(target); + bool isDS = rangesGroup.begin()->BlobId.IsDsBlob(); + + std::vector<ui64> dsReads; for (auto& blobRange : rangesGroup) { if (requestSize && (requestSize + blobRange.Size > MAX_REQUEST_BYTES)) { - SendBatchReadRequest(CookieToRange[cookie], readVariant, cookie, ctx); + if (isDS) { + dsReads.push_back(cookie); + } else { + tabletReads.push_back(cookie); + } cookie = ++ReadCookie; requestSize = 0; } @@ -415,18 +443,21 @@ private: CookieToRange[cookie].emplace_back(std::move(blobRange)); } if (requestSize) { - SendBatchReadRequest(CookieToRange[cookie], readVariant, cookie, ctx); + if (isDS) { + dsReads.push_back(cookie); + } else { + tabletReads.push_back(cookie); + } cookie = ++ReadCookie; requestSize = 0; } - } - for (auto& [blobId, ranges] : fallbackRanges) { - Y_VERIFY(blobId.IsDsBlob()); - Y_VERIFY(!ranges.empty()); + for (ui64 cookie : dsReads) { + SendBatchReadRequestToDS(CookieToRange[cookie], cookie, dsGroup, readVariant, ctx); + } + } - cookie = ++ReadCookie; - CookieToRange[cookie] = std::move(ranges); + for (ui64 cookie : tabletReads) { SendBatchReadRequestToTablet(CookieToRange[cookie], cookie, ctx); } } @@ -461,12 +492,18 @@ private: Y_VERIFY(blobRanges.size() == ev->Get()->ResponseSz, "Mismatched number of results for read request!"); + // We could find blob ranges evicted (NODATA). Try to fallback them to tablet. + THashMap<TUnifiedBlobId, std::vector<TBlobRange>> fallbackRanges; for (size_t i = 0; i < ev->Get()->ResponseSz; ++i) { const auto& res = ev->Get()->Responses[i]; - ProcessSingleRangeResult(blobRanges[i], readCookie, res.Status, res.Buffer, ctx); + if (res.Status == NKikimrProto::EReplyStatus::NODATA) { + fallbackRanges[blobRanges[i].BlobId].emplace_back(std::move(blobRanges[i])); + } else { + ProcessSingleRangeResult(blobRanges[i], readCookie, res.Status, res.Buffer, ctx); + } } - MakeReadRequests(ctx); + MakeReadRequests(ctx, std::move(fallbackRanges)); } void ProcessSingleRangeResult(const TBlobRange& blobRange, const ui64 readCookie, @@ -532,6 +569,8 @@ private: InFlightTabletRequests[tabletId].insert(cookie); NTabletPipe::SendData(ctx, ShardPipes[tabletId], ev.release(), cookie); + + ReadRequests->Inc(); } // Frogets the pipe to the tablet and fails all in-flight requests to it @@ -581,12 +620,11 @@ private: } void Handle(TEvColumnShard::TEvReadBlobRangesResult::TPtr& ev, const TActorContext& ctx) { - ui64 tabletId = ev->Get()->Record.GetTabletId(); + const auto& record = ev->Get()->Record; + ui64 tabletId = record.GetTabletId(); ui64 readCookie = ev->Cookie; LOG_S_DEBUG("Got read result from tablet: " << tabletId); - InFlightTabletRequests[tabletId].erase(readCookie); - auto cookieIt = CookieToRange.find(readCookie); if (cookieIt == CookieToRange.end()) { // This might only happen in case fo race between response and pipe close @@ -595,19 +633,45 @@ private: } std::vector<TBlobRange> blobRanges = std::move(cookieIt->second); - CookieToRange.erase(readCookie); - const auto& record = ev->Get()->Record; + Y_VERIFY(record.ResultsSize(), "Zero results for read request!"); + Y_VERIFY(blobRanges.size() >= record.ResultsSize(), "Mismatched number of results for read request"); - Y_VERIFY(blobRanges.size() == record.ResultsSize(), "Mismatched number of results for read request!"); + if (blobRanges.size() == record.ResultsSize()) { + InFlightTabletRequests[tabletId].erase(readCookie); + CookieToRange.erase(readCookie); + } else { + // Extract blobRanges for returned blobId. Keep others ordered. + TString strReturnedBlobId = record.GetResults(0).GetBlobRange().GetBlobId(); + std::vector<TBlobRange> same; + std::vector<TBlobRange> others; + same.reserve(record.ResultsSize()); + others.reserve(blobRanges.size() - record.ResultsSize()); + + for (auto&& blobRange : blobRanges) { + TString strBlobId = blobRange.BlobId.ToStringNew(); + if (strBlobId == strReturnedBlobId) { + same.emplace_back(std::move(blobRange)); + } else { + others.emplace_back(std::move(blobRange)); + } + } + blobRanges.swap(same); + + CookieToRange[readCookie] = std::move(others); + } for (size_t i = 0; i < record.ResultsSize(); ++i) { const auto& res = record.GetResults(i); + const auto& blobRange = blobRanges[i]; + if (!blobRange.BlobId.IsSmallBlob()) { + FallbackDataSize -= blobRange.Size; + } - Y_VERIFY(blobRanges[i].BlobId.ToStringNew() == res.GetBlobRange().GetBlobId()); - Y_VERIFY(blobRanges[i].Offset == res.GetBlobRange().GetOffset()); - Y_VERIFY(blobRanges[i].Size == res.GetBlobRange().GetSize()); - ProcessSingleRangeResult(blobRanges[i], readCookie, res.GetStatus(), res.GetData(), ctx); + Y_VERIFY(blobRange.BlobId.ToStringNew() == res.GetBlobRange().GetBlobId()); + Y_VERIFY(blobRange.Offset == res.GetBlobRange().GetOffset()); + Y_VERIFY(blobRange.Size == res.GetBlobRange().GetSize()); + ProcessSingleRangeResult(blobRange, readCookie, res.GetStatus(), res.GetData(), ctx); } MakeReadRequests(ctx); @@ -634,7 +698,11 @@ private: break; } - LOG_S_DEBUG("Evict: " << it.Key() << ";CacheDataSize:" << CacheDataSize << ";InFlightDataSize:" << (i64)InFlightDataSize << ";MaxCacheDataSize:" << (i64)MaxCacheDataSize); + LOG_S_DEBUG("Evict: " << it.Key() + << " CacheDataSize: " << CacheDataSize + << " InFlightDataSize: " << (i64)InFlightDataSize + << " MaxCacheDataSize: " << (i64)MaxCacheDataSize + << " MaxFallbackDataSize: " << (i64)MaxFallbackDataSize); { // Remove the range from list of ranges by blob id diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h index 38d68f962e2..fb6ab9db2fd 100644 --- a/ydb/core/tx/columnshard/blob_cache.h +++ b/ydb/core/tx/columnshard/blob_cache.h @@ -22,9 +22,16 @@ using TLogThis = TCtorLogger<NKikimrServices::BLOB_CACHE>; struct TReadBlobRangeOptions { bool CacheAfterRead; - bool Fallback; + bool ForceFallback; bool IsBackgroud; bool WithDeadline = true; + + TString ToString() const { + return TStringBuilder() << "cache: " << (ui32)CacheAfterRead + << " fallback: " << (ui32)ForceFallback + << " background: " << (ui32)IsBackgroud + << " dedlined: " << (ui32)WithDeadline; + } }; struct TEvBlobCache { @@ -60,7 +67,7 @@ struct TEvBlobCache { : BlobRanges(std::move(blobRanges)) , ReadOptions(std::move(opts)) { - if (opts.Fallback) { + if (opts.ForceFallback) { for (const auto& blobRange : BlobRanges) { Y_VERIFY(blobRange.BlobId == BlobRanges[0].BlobId); } diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index fb3c6c5e3f3..3f3cfaa47d0 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -674,6 +674,10 @@ void TBlobManager::PerformDelayedDeletes(IBlobManagerDb& db) { SmallBlobsToDelete.clear(); } +bool TBlobManager::BlobInUse(const NOlap::TUnifiedBlobId& blobId) const { + return BlobsUseCount.count(blobId); +} + void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) { if (inUse) { BlobsUseCount[blobId]++; diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index c0dfc48b35d..415590d7aa2 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -249,6 +249,7 @@ public: // Implementation of IBlobInUseTracker void SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) override; + bool BlobInUse(const NOlap::TUnifiedBlobId& blobId) const override; private: TGenStep FindNewGCBarrier(); diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp index 5f9e0615457..9eef2df27b4 100644 --- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp @@ -169,21 +169,25 @@ void TColumnShard::Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TAc } else if (isFallback) { Y_VERIFY(evictedBlobId->IsValid()); - ui32 status = NKikimrProto::EReplyStatus::ERROR; - NKikimrTxColumnShard::TEvictMetadata meta; auto evicted = BlobManager->GetEvicted(*evictedBlobId, meta); + if (!evicted.Blob.IsValid()) { + evicted = BlobManager->GetDropped(*evictedBlobId, meta); + } + if (!evicted.Blob.IsValid() || !evicted.ExternBlob.IsValid()) { - auto result = MakeErrorResponse(msg, TabletID(), status); + LOG_S_NOTICE("No data for blobId " << evictedBlobId->ToStringNew() << " at tablet " << TabletID()); + auto result = MakeErrorResponse(msg, TabletID(), NKikimrProto::EReplyStatus::NODATA); ctx.Send(ev->Sender, result.release(), 0, ev->Cookie); + return; } TString tierName = meta.GetTierName(); - Y_VERIFY(!tierName.empty()); + Y_VERIFY_S(!tierName.empty(), evicted.ToString()); if (!GetExportedBlob(ctx, ev->Sender, ev->Cookie, tierName, std::move(evicted), std::move(msg.BlobRanges))) { - auto result = MakeErrorResponse(msg, TabletID(), status); + auto result = MakeErrorResponse(msg, TabletID(), NKikimrProto::EReplyStatus::ERROR); ctx.Send(ev->Sender, result.release(), 0, ev->Cookie); } } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 706594ad1a4..0eeae1a22c1 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -111,7 +111,7 @@ private: NBlobCache::TReadBlobRangeOptions readOpts { .CacheAfterRead = true, - .Fallback = fallback, + .ForceFallback = fallback, .IsBackgroud = false }; Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 47e6990c39b..1c0c1c936ac 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1077,6 +1077,12 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const THashSet<NOlap::T for (const auto& ev : evictedBlobs) { auto& blobId = ev.Blob; + if (BlobManager->BlobInUse(blobId)) { + LOG_S_DEBUG("Blob '" << blobId.ToStringNew() << "' in use at tablet " << TabletID()); + strBlobsDelayed += "'" + blobId.ToStringNew() + "' "; + continue; + } + TEvictMetadata meta; auto evict = BlobManager->GetDropped(blobId, meta); diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index c557a367684..5748ca84064 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -2,6 +2,7 @@ #include "columnshard.h" #include "columnshard_impl.h" +#include "blob_cache.h" #include <ydb/core/formats/arrow_batch_builder.h> #include <ydb/core/scheme/scheme_tabledefs.h> @@ -13,6 +14,16 @@ namespace NKikimr::NTxUT { +// Private events of different actors reuse the same ES_PRIVATE range +// So in order to capture the right private event we need to check its type via dynamic_cast +template <class TPrivateEvent> +inline TPrivateEvent* TryGetPrivateEvent(TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() != TPrivateEvent::EventType) { + return nullptr; + } + return dynamic_cast<TPrivateEvent*>(ev->StaticCastAsLocal<IEventBase>()); +} + class TTester : public TNonCopyable { public: static constexpr const ui64 FAKE_SCHEMESHARD_TABLET_ID = 4200; diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index 1b5d26ff400..66028849159 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -109,7 +109,7 @@ private: NBlobCache::TReadBlobRangeOptions readOpts { .CacheAfterRead = false, - .Fallback = isExternal, + .ForceFallback = isExternal, .IsBackgroud = true }; Send(BlobCacheActorId, diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index e379a6b9189..39f116d73b2 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -106,7 +106,7 @@ private: NBlobCache::TReadBlobRangeOptions readOpts { .CacheAfterRead = false, - .Fallback = isExternal, + .ForceFallback = isExternal, .IsBackgroud = true }; Send(BlobCacheActorId, diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp index 5b9cfca6bbb..8dff625bf1d 100644 --- a/ydb/core/tx/columnshard/export_actor.cpp +++ b/ydb/core/tx/columnshard/export_actor.cpp @@ -95,7 +95,7 @@ private: NBlobCache::TReadBlobRangeOptions readOpts { .CacheAfterRead = false, - .Fallback = false, + .ForceFallback = false, .IsBackgroud = true }; Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index 35f9b7cb314..5412ba3633c 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -110,7 +110,7 @@ private: Y_VERIFY(blobRange.Size); NBlobCache::TReadBlobRangeOptions readOpts { .CacheAfterRead = false, - .Fallback = false, + .ForceFallback = false, .IsBackgroud = true, .WithDeadline = false }; diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h index cdec4279bc4..72beea01593 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.h +++ b/ydb/core/tx/columnshard/inflight_request_tracker.h @@ -15,6 +15,7 @@ public: // until all the references are released. // NOTE: this ref counts are in-memory only, so the blobs can be deleted if tablet restarts virtual void SetBlobInUse(const NOlap::TUnifiedBlobId& blobId, bool inUse) = 0; + virtual bool BlobInUse(const NOlap::TUnifiedBlobId& blobId) const = 0; }; using NOlap::TReadMetadata; @@ -59,6 +60,9 @@ public: } else { it->second--; } + for (auto& rec : portion.Records) { + blobTracker.SetBlobInUse(rec.BlobRange.BlobId, false); + } } for (const auto& committedBlob : readMeta->CommittedBlobs) { @@ -97,6 +101,9 @@ private: for (const auto& portion : readMeta->SelectInfo->Portions) { const ui64 portionId = portion.Records[0].Portion; PortionUseCount[portionId]++; + for (auto& rec : portion.Records) { + blobTracker.SetBlobInUse(rec.BlobRange.BlobId, true); + } } for (const auto& committedBlob : readMeta->CommittedBlobs) { diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index a4f05397d71..69e8dc37c20 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -221,7 +221,7 @@ public: NBlobCache::TReadBlobRangeOptions readOpts { .CacheAfterRead = true, - .Fallback = fallback, + .ForceFallback = fallback, .IsBackgroud = false }; Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 0771ed4bf6a..af33c4f1a84 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2250,16 +2250,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } } - // Private events of different actors reuse the same ES_PRIVATE range - // So in order to capture the right private event we need to check its type via dynamic_cast - template <class TPrivateEvent> - TPrivateEvent* TryGetPrivateEvent(TAutoPtr<IEventHandle> &ev) { - if (ev->GetTypeRewrite() != TPrivateEvent::EventType) { - return nullptr; - } - return dynamic_cast<TPrivateEvent*>(ev->StaticCastAsLocal<IEventBase>()); - } - void TestCompactionGC(bool enableSmallBlobs) { TTestBasicRuntime runtime; TTester::Setup(runtime); diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index a3426017684..a5847ac6772 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -168,6 +168,22 @@ bool TestCreateTable(const TString& txBody, ui64 planStep = 1000, ui64 txId = 10 return ProposeSchemaTx(runtime, sender, txBody, {++planStep, ++txId}); } +TString GetReadResult(NKikimrTxColumnShard::TEvReadResult& resRead, + std::optional<ui32> batchNo = 0, + std::optional<bool> finished = true) +{ + UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); + UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), TTestTxConfig::TxTablet1); + UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + if (batchNo) { + UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), *batchNo); + } + if (finished) { + UNIT_ASSERT_EQUAL(resRead.GetFinished(), *finished); + } + return resRead.GetData(); +} + static constexpr ui32 PORTION_ROWS = 80 * 1000; // ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020 @@ -265,15 +281,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT(event); auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT(resRead.GetData().size() > 0); + TString data = GetReadResult(resRead); + UNIT_ASSERT(data.size() > 0); auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(resRead.GetData(), schema, PORTION_ROWS, spec.TtlColumn, ts[1])); + UNIT_ASSERT(CheckSame(data, schema, PORTION_ROWS, spec.TtlColumn, ts[1])); } // Alter TTL @@ -308,12 +320,8 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT(event); auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetData().size(), 0); + TString data = GetReadResult(resRead); + UNIT_ASSERT_VALUES_EQUAL(data.size(), 0); } // Disable TTL @@ -366,6 +374,8 @@ public: ui32 SuccessCounter = 0; ui32 ErrorsCounter = 0; ui32 ResponsesCounter = 0; + ui32 CaptureReadEvents = 0; + std::vector<TAutoPtr<IEventHandle>> CapturedReads; TString SerializeToString() const { TStringBuilder sb; @@ -399,47 +409,60 @@ public: Cerr << "FINISH_WAITING(" << attemption << "): " << SerializeToString() << Endl; SuccessCounterStart = SuccessCounter; } + + void WaitReadsCaptured(TTestBasicRuntime& runtime) const { + const TInstant startInstant = TAppData::TimeProvider->Now(); + const TInstant deadline = startInstant + TDuration::Seconds(10); + while (CaptureReadEvents && TAppData::TimeProvider->Now() < deadline) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + UNIT_ASSERT_VALUES_EQUAL(CaptureReadEvents, 0); + } + + void ResendCapturedReads(TTestBasicRuntime& runtime) { + for (auto& cev : CapturedReads) { + auto* msg = TryGetPrivateEvent<NBlobCache::TEvBlobCache::TEvReadBlobRange>(cev); + UNIT_ASSERT(msg); + Cerr << "RESEND " << msg->BlobRange.ToString() << " " + << msg->ReadOptions.ToString() << Endl; + runtime.Send(cev.Release()); + } + CapturedReads.clear(); + } }; class TEventsCounter { private: TCountersContainer* Counters = nullptr; TTestBasicRuntime& Runtime; - const TActorId Sender; - - template <class TPrivateEvent> - static TPrivateEvent* TryGetPrivateEvent(TAutoPtr<IEventHandle>& ev) { - return ev->CastAsLocal<TPrivateEvent>(); - } public: - TEventsCounter(TCountersContainer& counters, TTestBasicRuntime& runtime, const TActorId sender) + TEventsCounter(TCountersContainer& counters, TTestBasicRuntime& runtime) : Counters(&counters) , Runtime(runtime) - , Sender(sender) { Y_UNUSED(Runtime); - Y_UNUSED(Sender); } bool operator()(TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { TStringBuilder ss; if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvExport>(ev)) { - ss << "EXPORT"; - if (msg->Status == NKikimrProto::OK) { - ss << "(" << ++Counters->SuccessCounter << "): SUCCESS"; - } - if (msg->Status == NKikimrProto::ERROR) { - ss << "(" << ++Counters->ErrorsCounter << "): ERROR"; - } - if (msg->Status == NKikimrProto::UNKNOWN) { - ss << "(" << ++Counters->UnknownsCounter << "): UNKNOWN"; - } + ss << "EXPORT(" << ++Counters->SuccessCounter << "): " << NKikimrProto::EReplyStatus_Name(msg->Status); } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectResponse>(ev)) { ss << "S3_RESPONSE(put " << ++Counters->ResponsesCounter << "):"; } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvDeleteObjectResponse>(ev)) { ss << "(" << ++Counters->SuccessCounter << "): DELETE SUCCESS"; ss << "S3_RESPONSE(delete " << ++Counters->ResponsesCounter << "):"; + } else if (auto* msg = TryGetPrivateEvent<NBlobCache::TEvBlobCache::TEvReadBlobRange>(ev)) { + if (Counters->CaptureReadEvents) { + Cerr << "CAPTURE " << msg->BlobRange.ToString() << " " + << msg->ReadOptions.ToString() << Endl; + --Counters->CaptureReadEvents; + Counters->CapturedReads.push_back(ev.Release()); + return true; + } else { + return false; + } } else { return false; } @@ -467,6 +490,18 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); + // Disable blob cache. It hides evict-delete, evict-read races. + { + TAtomic unused; + runtime.GetAppData().Icb->SetValue("BlobCache.MaxCacheDataSize", 0, unused); + } + + // Disable GC batching so that deleted blobs get collected without a delay + { + TAtomic unused; + runtime.GetAppData().Icb->SetValue("ColumnShardControls.BlobCountToTriggerGC", 1, unused); + } + // ui64 metaShard = TTestTxConfig::TxTablet1; @@ -503,7 +538,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt specRowsBytes.reserve(specs.size()); TCountersContainer counter; - runtime.SetEventFilter(TEventsCounter(counter, runtime, sender)); + runtime.SetEventFilter(TEventsCounter(counter, runtime)); for (ui32 i = 0; i < specs.size(); ++i) { bool hasColdEviction = false; for (auto&& i : specs[i].Tiers) { @@ -526,44 +561,71 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i])); } + // Read crossed with eviction (start) + { + auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep-1, Max<ui64>(), tableId); + Proto(read.get()).AddColumnNames(specs[i].TtlColumn); + + counter.CaptureReadEvents = 1; // TODO: we need affected by tiering blob here + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); + counter.WaitReadsCaptured(runtime); + } + + // Eviction + TriggerTTL(runtime, sender, { ++planStep, ++txId }, {}, 0, specs[i].TtlColumn); + + Cerr << (hasColdEviction ? "Cold" : "Hot") + << " tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n"; + if (hasColdEviction) { - Cerr << "Cold tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n"; if (i > initialEviction) { counter.WaitEvents(runtime, i, 1, TDuration::Seconds(40)); } else { counter.WaitEvents(runtime, i, 0, TDuration::Seconds(20)); } } else { - Cerr << "Hot tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n"; counter.WaitEvents(runtime, i, 0, TDuration::Seconds(4)); } if (reboots) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i])); } - // Read + // Read crossed with eviction (finish) + { + counter.ResendCapturedReads(runtime); + ui32 numBatches = 0; + THashSet<ui32> batchNumbers; + while (!numBatches || numBatches < batchNumbers.size()) { + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); + UNIT_ASSERT(event); + + auto& resRead = Proto(event); + TString data = GetReadResult(resRead, {}, {}); + batchNumbers.insert(resRead.GetBatch()); + if (resRead.GetFinished()) { + numBatches = resRead.GetBatch() + 1; + } + } + } + + // Read data after eviction - --planStep; - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); + auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep-1, Max<ui64>(), tableId); Proto(read.get()).AddColumnNames(specs[i].TtlColumn); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); + specRowsBytes.emplace_back(0, 0); - ui32 idx = 0; while (true) { auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); UNIT_ASSERT(event); auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), idx++); - - if (!resRead.GetData().size()) { + TString data = GetReadResult(resRead, {}, {}); + if (!data.size()) { break; } + auto& meta = resRead.GetMeta(); auto& schema = meta.GetSchema(); auto ttlColumn = DeserializeColumn(resRead.GetData(), schema, specs[i].TtlColumn); @@ -841,12 +903,8 @@ void TestDrop(bool reboots) { UNIT_ASSERT(event); auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT_EQUAL(resRead.GetData().size(), 0); + TString data = GetReadResult(resRead); + UNIT_ASSERT_EQUAL(data.size(), 0); } } |