diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-04-18 15:13:58 +0300 |
---|---|---|
committer | Artem Zuikov <chertus@gmail.com> | 2022-04-18 15:13:58 +0300 |
commit | d16fc999f8322e88777cea8bde5b94eda867fb19 (patch) | |
tree | 2f7dba65f9dc23bc0975113cbe0cd1b907b7ff9c | |
parent | c046b295578935b7ff07beb0913a315caa8f92de (diff) | |
download | ydb-d16fc999f8322e88777cea8bde5b94eda867fb19.tar.gz |
KIKIMR-13593: read from S3
ref:dd077b5b7ea4145cc71dea1d4c3c515d491896d5
21 files changed, 508 insertions, 202 deletions
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h index 250b7db6eb..97cbb54b82 100644 --- a/ydb/core/tx/columnshard/blob.h +++ b/ydb/core/tx/columnshard/blob.h @@ -330,6 +330,10 @@ struct TEvictedBlob { ui64 Hash() const noexcept { return Blob.Hash(); } + + bool IsExternal() const { + return ExternBlob.IsValid(); + } }; } diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp index 1d676cd21a..0ef5cb89c7 100644 --- a/ydb/core/tx/columnshard/blob_cache.cpp +++ b/ydb/core/tx/columnshard/blob_cache.cpp @@ -13,6 +13,24 @@ namespace NKikimr::NBlobCache { +namespace { + // Blobs with same tagret can be read in a single request + // (e.g. DS blobs from the same tablet residing on the same DS group, or 2 small blobs from the same tablet) + std::pair<ui64, ui32> BlobSource(const TUnifiedBlobId& blobId) { + Y_VERIFY(blobId.IsValid()); + + if (blobId.IsDsBlob()) { + // Tablet & group restriction + return {blobId.GetTabletId(), blobId.GetDsGroup()}; + } else if (blobId.IsSmallBlob()) { + // Tablet restriction, no group restrictions + return {blobId.GetTabletId(), 0}; + } + + return {0, 0}; + } +} + using namespace NActors; class TBlobCache: public TActorBootstrapped<TBlobCache> { @@ -26,6 +44,11 @@ private: {} }; + struct TReadItem { + TBlobRange BlobRange; + bool Fallback{false}; + }; + static constexpr i64 MAX_IN_FLIGHT_BYTES = 250ll << 20; static constexpr i64 MAX_REQUEST_BYTES = 8ll << 20; static constexpr TDuration DEFAULT_READ_DEADLINE = TDuration::Seconds(5); @@ -35,17 +58,18 @@ private: // It is used to remove all blob ranges from cache when // it gets a notification that a blob has been deleted TControlWrapper MaxCacheDataSize; + TControlWrapper MaxCacheExternalDataSize; // Cache size for extern (i.e. S3) blobs TControlWrapper MaxInFlightDataSize; i64 CacheDataSize; // Current size of all blobs in cache ui64 ReadCookie; THashMap<ui64, std::vector<TBlobRange>> CookieToRange; // All in-flight requests THashMap<TBlobRange, TReadInfo> OutstandingReads; // All in-flight and enqueued reads - TDeque<TBlobRange> ReadQueue; // Reads that are waiting to be sent + TDeque<TReadItem> ReadQueue; // Reads that are waiting to be sent // TODO: Consider making per-group queues i64 InFlightDataSize; // Current size of all in-flight blobs THashMap<ui64, TActorId> ShardPipes; // TabletId -> PipeClient for small blob read requests - THashMap<ui64, THashSet<ui64>> InFlightSmallBlobRequests; // TabletId -> list to read cookies + THashMap<ui64, THashSet<ui64>> InFlightTabletRequests; // TabletId -> list to read cookies using TCounterPtr = NMonitoring::TDynamicCounters::TCounterPtr; const TCounterPtr SizeBytes; @@ -75,6 +99,7 @@ public: : TActorBootstrapped<TBlobCache>() , Cache(SIZE_MAX) , MaxCacheDataSize(maxSize, 0, 1ull << 40) + , MaxCacheExternalDataSize(0, 0, 1ull << 40) , MaxInFlightDataSize(Min<i64>(MaxCacheDataSize, MAX_IN_FLIGHT_BYTES), 0, 10ull << 30) , CacheDataSize(0) , ReadCookie(1) @@ -98,8 +123,10 @@ public: {} void Bootstrap(const TActorContext& ctx) { - AppData(ctx)->Icb->RegisterSharedControl(MaxCacheDataSize, "BlobCache.MaxCacheDataSize"); - AppData(ctx)->Icb->RegisterSharedControl(MaxInFlightDataSize, "BlobCache.MaxInFlightDataSize"); + auto& icb = AppData(ctx)->Icb; + icb->RegisterSharedControl(MaxCacheDataSize, "BlobCache.MaxCacheDataSize"); + icb->RegisterSharedControl(MaxCacheExternalDataSize, "BlobCache.MaxCacheExternalDataSize"); + icb->RegisterSharedControl(MaxInFlightDataSize, "BlobCache.MaxInFlightDataSize"); Become(&TBlobCache::StateFunc); ScheduleWakeup(); } @@ -142,17 +169,18 @@ private: void Handle(TEvBlobCache::TEvReadBlobRange::TPtr& ev, const TActorContext& ctx) { const TBlobRange& blobRange = ev->Get()->BlobRange; - const bool promote = ev->Get()->Cache; + const bool promote = ev->Get()->CacheAfterRead; + const bool fallback = ev->Get()->Fallback; - LOG_S_DEBUG("Read request: " << blobRange); + LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback); - HandleSingleRangeRead(blobRange, promote, ev->Sender, ctx); + HandleSingleRangeRead(blobRange, promote, fallback, ev->Sender, ctx); MakeReadRequests(ctx); } void HandleSingleRangeRead(const TBlobRange& blobRange, - const bool promoteInCache, const TActorId& sender, const TActorContext& ctx) + bool promoteInCache, bool fallback, const TActorId& sender, const TActorContext& ctx) { // Is in cache? auto it = promoteInCache ? Cache.Find(blobRange) : Cache.FindWithoutPromote(blobRange); @@ -164,7 +192,13 @@ private: Misses->Inc(); - // Is outsanding? + // Disable promoting reads for external blobs if MaxCacheExternalDataSize is zero. But keep promoting hits. + // For now MaxCacheExternalDataSize is just a disabled/enabled flag. TODO: real MaxCacheExternalDataSize + if (fallback && MaxCacheExternalDataSize == 0) { + promoteInCache = false; + } + + // Is outstanding? auto readIt = OutstandingReads.find(blobRange); if (readIt != OutstandingReads.end()) { readIt->second.Waiting.push_back(sender); @@ -172,15 +206,16 @@ private: return; } - EnqueueRead(blobRange, promoteInCache, sender); + EnqueueRead(blobRange, promoteInCache, sender, fallback); } void Handle(TEvBlobCache::TEvReadBlobRangeBatch::TPtr& ev, const TActorContext& ctx) { const auto& ranges = ev->Get()->BlobRanges; + bool fallback = ev->Get()->Fallback; LOG_S_DEBUG("Batch read request: " << JoinStrings(ranges.begin(), ranges.end(), " ")); for (const auto& blobRange : ranges) { - HandleSingleRangeRead(blobRange, ev->Get()->Cache, ev->Sender, ctx); + HandleSingleRangeRead(blobRange, ev->Get()->CacheAfterRead, fallback, ev->Sender, ctx); } MakeReadRequests(ctx); @@ -240,14 +275,14 @@ private: CachedRanges.erase(blobIdIt); } - void EnqueueRead(const TBlobRange& blobRange, const bool promoteInCache, const TActorId& sender) { + void EnqueueRead(const TBlobRange& blobRange, bool promoteInCache, const TActorId& sender, bool fallback) { TReadInfo& blobInfo = OutstandingReads[blobRange]; blobInfo.Waiting.push_back(sender); blobInfo.Cache = promoteInCache; LOG_S_DEBUG("Enqueue read range: " << blobRange); - ReadQueue.push_back(blobRange); + ReadQueue.push_back(TReadItem{blobRange, fallback}); ReadsInQueue->Set(ReadQueue.size()); } @@ -284,68 +319,74 @@ private: ReadRequests->Inc(); } - // Checks if 2 blobs can be read in a single request (e.g. DS blobs from the same - // tablet residing on the same DS group, or 2 small blobs from the same tablet) - inline bool CanBatchReads(const TUnifiedBlobId& a, const TUnifiedBlobId& b) { - if (a.GetType() != b.GetType()) { - return false; - } + void MakeReadRequests(const TActorContext& ctx) { + THashMap<std::pair<ui64, ui32>, std::vector<TBlobRange>> groupedBlobRanges; + THashMap<TUnifiedBlobId, std::vector<TBlobRange>> fallbackRanges; + + while (!ReadQueue.empty()) { + const auto& readItem = ReadQueue.front(); + const TBlobRange& blobRange = readItem.BlobRange; + + if (readItem.Fallback) { + // For now it's always possible to add external read cause we must not block ReadQueue by such reads + // TODO: separate ReadQueue and InFlightDataSize for external reads + fallbackRanges[blobRange.BlobId].push_back(blobRange); + } else { + // NOTE: if queue is not empty, at least 1 in-flight request is allowed + if (InFlightDataSize && InFlightDataSize >= MaxInFlightDataSize) { + break; + } + InFlightDataSize += blobRange.Size; - if (!a.IsValid()) { - return false; - } + std::pair<ui64, ui32> blobSrc = BlobSource(blobRange.BlobId); + groupedBlobRanges[blobSrc].push_back(blobRange); + } - // Same tablet and same DS group? - if (a.IsDsBlob()) { - return a.GetTabletId() == b.GetTabletId() && - a.GetDsGroup() == b.GetDsGroup(); - } + SizeBytesInFlight->Add(blobRange.Size); + SizeBlobsInFlight->Inc(); - // Small blobs from the same tablet? - if (a.IsSmallBlob()) { - return a.GetTabletId() == b.GetTabletId(); + ReadQueue.pop_front(); } - return false; - } + ReadsInQueue->Set(ReadQueue.size()); + + // We might need to free some space to accomodate the results of new reads + Evict(ctx); - void MakeReadRequests(const TActorContext& ctx) { - std::vector<TBlobRange> blobRanges; ui64 cookie = ++ReadCookie; - ui64 requestSize = 0; - // NOTE: if queue is not empty, at least 1 in-flight request is allowed - while (!ReadQueue.empty() && (InFlightDataSize == 0 || InFlightDataSize < MaxInFlightDataSize)) { - const TBlobRange& blobRange = ReadQueue.front(); + for (auto& [target, rangesGroup] : groupedBlobRanges) { + std::vector<TBlobRange> rangesBatch; + rangesBatch.reserve(rangesGroup.size()); + ui64 requestSize = 0; + + for (auto& blobRange : rangesGroup) { + if (!rangesBatch.empty() && (requestSize + blobRange.Size > MAX_REQUEST_BYTES)) { + SendBatchReadRequest(rangesBatch, cookie, ctx); + rangesBatch.clear(); + cookie = ++ReadCookie; + requestSize = 0; + } - // Only group ranges from the same Tablet and same DS group - if (!blobRanges.empty() && ( - !CanBatchReads(blobRanges.back().BlobId, blobRange.BlobId) || - requestSize > MAX_REQUEST_BYTES - )) - { - SendBatchReadRequest(blobRanges, cookie, ctx); - blobRanges.clear(); - cookie = ++ReadCookie; - requestSize = 0; + rangesBatch.push_back(blobRange); + requestSize += blobRange.Size; + CookieToRange[cookie].push_back(blobRange); } - blobRanges.push_back(blobRange); - requestSize += blobRange.Size; - CookieToRange[cookie].push_back(blobRange); - - SizeBytesInFlight->Add(blobRange.Size); - SizeBlobsInFlight->Inc(); - InFlightDataSize += blobRange.Size; + if (!rangesBatch.empty()) { + SendBatchReadRequest(rangesBatch, cookie, ctx); + } + } - // We might need to free some space to accomodate the results of new reads - Evict(ctx); + for (auto& [blobId, ranges] : fallbackRanges) { + Y_VERIFY(blobId.IsDsBlob()); + Y_VERIFY(!ranges.empty()); - ReadQueue.pop_front(); - } - if (!blobRanges.empty()) { - SendBatchReadRequest(blobRanges, cookie, ctx); + cookie = ++ReadCookie; + for (auto& blobRange : ranges) { + CookieToRange[cookie].push_back(blobRange); + } + SendBatchReadRequestToTablet(ranges, cookie, ctx); } - ReadsInQueue->Set(ReadQueue.size()); } void SendResult(const TActorId& to, const TBlobRange& blobRange, NKikimrProto::EReplyStatus status, @@ -445,15 +486,9 @@ private: ShardPipes[tabletId] = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig)); } - auto ev = std::make_unique<TEvColumnShard::TEvReadBlobRanges>(); - for (const auto& r : blobRanges) { - auto* range = ev->Record.AddBlobRanges(); - range->SetBlobId(r.BlobId.ToStringNew()); - range->SetOffset(r.Offset); - range->SetSize(r.Size); - } + auto ev = std::make_unique<TEvColumnShard::TEvReadBlobRanges>(blobRanges); - InFlightSmallBlobRequests[tabletId].insert(cookie); + InFlightTabletRequests[tabletId].insert(cookie); NTabletPipe::SendData(ctx, ShardPipes[tabletId], ev.release(), cookie); } @@ -461,8 +496,8 @@ private: void DestroyPipe(ui64 tabletId, const TActorContext& ctx) { ShardPipes.erase(tabletId); // Send errors for in-flight requests - auto cookies = std::move(InFlightSmallBlobRequests[tabletId]); - InFlightSmallBlobRequests.erase(tabletId); + auto cookies = std::move(InFlightTabletRequests[tabletId]); + InFlightTabletRequests.erase(tabletId); for (ui64 readCookie : cookies) { auto cookieIt = CookieToRange.find(readCookie); if (cookieIt == CookieToRange.end()) { @@ -509,7 +544,7 @@ private: ui64 readCookie = ev->Cookie; LOG_S_DEBUG("Got read result from tablet: " << tabletId); - InFlightSmallBlobRequests[tabletId].erase(readCookie); + InFlightTabletRequests[tabletId].erase(readCookie); auto cookieIt = CookieToRange.find(readCookie); if (cookieIt == CookieToRange.end()) { diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h index 6901eb218b..68ae86f5e0 100644 --- a/ydb/core/tx/columnshard/blob_cache.h +++ b/ydb/core/tx/columnshard/blob_cache.h @@ -36,11 +36,13 @@ struct TEvBlobCache { struct TEvReadBlobRange : public NActors::TEventLocal<TEvReadBlobRange, EvReadBlobRange> { TBlobRange BlobRange; - bool Cache; - // TODO: pass some kind of priority? - explicit TEvReadBlobRange(const TBlobRange& blobRange, bool cache = true) + bool CacheAfterRead; + bool Fallback; + + explicit TEvReadBlobRange(const TBlobRange& blobRange, bool cacheResult, bool fallback) : BlobRange(blobRange) - , Cache(cache) + , CacheAfterRead(cacheResult) + , Fallback(fallback) {} }; @@ -48,12 +50,20 @@ struct TEvBlobCache { // This is usefull to save IOPs when reading multiple columns from the same blob struct TEvReadBlobRangeBatch : public NActors::TEventLocal<TEvReadBlobRangeBatch, EvReadBlobRangeBatch> { std::vector<TBlobRange> BlobRanges; - bool Cache; - // TODO: pass some kind of priority? - explicit TEvReadBlobRangeBatch(std::vector<TBlobRange>&& blobRanges, bool cache = true) - : BlobRanges(blobRanges) - , Cache(cache) - {} + bool CacheAfterRead; + bool Fallback; + + explicit TEvReadBlobRangeBatch(std::vector<TBlobRange>&& blobRanges, bool cacheResult, bool fallback) + : BlobRanges(std::move(blobRanges)) + , CacheAfterRead(cacheResult) + , Fallback(fallback) + { + if (fallback) { + for (const auto& blobRange : BlobRanges) { + Y_VERIFY(blobRange.BlobId == BlobRanges[0].BlobId); + } + } + } }; struct TEvReadBlobRangeResult : public NActors::TEventLocal<TEvReadBlobRangeResult, EvReadBlobRangeResult> { diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index 48c66c894e..bf958b428a 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -570,6 +570,15 @@ bool TBlobManager::LoadOneToOneExport(IBlobManagerDb& db) { return true; } +TEvictedBlob TBlobManager::GetEvicted(const TUnifiedBlobId& blobId, TEvictMetadata& meta) { + auto it = EvictedBlobs.find(TEvictedBlob{.Blob = blobId}); + if (it != EvictedBlobs.end()) { + meta = it->second; + return it->first; + } + return {}; +} + TEvictedBlob TBlobManager::GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) { auto it = DroppedEvictedBlobs.find(TEvictedBlob{.Blob = blobId}); if (it != DroppedEvictedBlobs.end()) { diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index 12611e835e..8bb09011f9 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -95,8 +95,9 @@ public: virtual bool UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) = 0; virtual bool EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) = 0; virtual bool LoadOneToOneExport(IBlobManagerDb& db) = 0; - //virtual TEvictedBlob GetEvicted(const TUnifiedBlobId& blob, TEvictMetadata& meta) = 0; + virtual TEvictedBlob GetEvicted(const TUnifiedBlobId& blob, TEvictMetadata& meta) = 0; virtual TEvictedBlob GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) = 0; + virtual bool HasExternBlobs() const = 0; }; // Garbage Collection generation and step @@ -236,8 +237,13 @@ public: bool UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) override; bool EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) override; bool LoadOneToOneExport(IBlobManagerDb& db) override; + TEvictedBlob GetEvicted(const TUnifiedBlobId& blobId, TEvictMetadata& meta) override; TEvictedBlob GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) override; + bool HasExternBlobs() const override { + return EvictedBlobs.size() || DroppedEvictedBlobs.size(); + } + // Implementation of IBlobInUseTracker void SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) override; diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 62486e3c33..de512d9e72 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -13,6 +13,10 @@ namespace NKikimr { +namespace NColumnShard { +class TBlobGroupSelector; +} + struct TEvColumnShard { enum EEv { EvProposeTransaction = EventSpaceBegin(TKikimrEvents::ES_TX_COLUMNSHARD), @@ -127,11 +131,39 @@ struct TEvColumnShard { } }; - // Read small blobs from the tablet + // Fallback read BlobCache read to tablet (small blobs or S3) struct TEvReadBlobRanges : public TEventPB<TEvReadBlobRanges, NKikimrTxColumnShard::TEvReadBlobRanges, TEvColumnShard::EvReadBlobRanges> { + std::vector<NOlap::TBlobRange> BlobRanges; + + TEvReadBlobRanges() = default; + + TEvReadBlobRanges(const std::vector<NOlap::TBlobRange>& blobRanges) + : BlobRanges(blobRanges) + { + for (const auto& r : BlobRanges) { + auto* range = Record.AddBlobRanges(); + range->SetBlobId(r.BlobId.ToStringNew()); + range->SetOffset(r.Offset); + range->SetSize(r.Size); + } + } + + void RestoreFromProto(NColumnShard::TBlobGroupSelector* dsGroupSelector, TString& errString) { + BlobRanges.clear(); + BlobRanges.reserve(Record.BlobRangesSize()); + + for (const auto& range : Record.GetBlobRanges()) { + auto blobId = NColumnShard::TUnifiedBlobId::ParseFromString(range.GetBlobId(), dsGroupSelector, + errString); + if (!errString.empty()) { + return; + } + BlobRanges.push_back(NOlap::TBlobRange{blobId, (ui32)range.GetOffset(), (ui32)range.GetSize()}); + } + } }; struct TEvReadBlobRangesResult : public TEventPB<TEvReadBlobRangesResult, diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 73723847c5..1db2290a4e 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -36,7 +36,7 @@ private: }; -NOlap::TReadMetadata::TPtr +std::shared_ptr<NOlap::TReadMetadata> TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read, const std::unique_ptr<NOlap::TInsertTable>& insertTable, const std::unique_ptr<NOlap::IColumnEngine>& index, @@ -250,13 +250,16 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, TIndexColumnResolver(Self->PrimaryIndex->GetIndexInfo())); + std::shared_ptr<NOlap::TReadMetadata> metadata; if (parseResult) { - ReadMetadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, ErrorDescription); + metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, ErrorDescription); } ui32 status = NKikimrTxColumnShard::EResultStatus::ERROR; - if (ReadMetadata) { + if (metadata) { + Self->MapExternBlobs(ctx, *metadata); + ReadMetadata = metadata; status = NKikimrTxColumnShard::EResultStatus::SUCCESS; } diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp index b742a726a7..1dd732b3d1 100644 --- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp @@ -118,9 +118,75 @@ void TTxReadBlobRanges::Complete(const TActorContext& ctx) { LOG_S_DEBUG("TTxReadBlobRanges.Complete at tablet " << Self->TabletID()); } +static std::unique_ptr<TEvColumnShard::TEvReadBlobRangesResult> +MakeErrorResponse(const TEvColumnShard::TEvReadBlobRanges& msg, ui64 tabletId, ui32 status) { + auto result = std::make_unique<TEvColumnShard::TEvReadBlobRangesResult>(tabletId); + for (const auto& range : msg.Record.GetBlobRanges()) { + auto* res = result->Record.AddResults(); + res->MutableBlobRange()->CopyFrom(range); + res->SetStatus(status); + } + return result; +} + void TColumnShard::Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TActorContext& ctx) { - LOG_S_DEBUG("Read blob ranges at tablet " << TabletID() << ev->Get()->Record); - Execute(new TTxReadBlobRanges(this, ev), ctx); + auto& msg = *ev->Get(); + + LOG_S_DEBUG("Read blob ranges at tablet " << TabletID() << msg.Record); + + if (msg.BlobRanges.empty()) { + TBlobGroupSelector dsGroupSelector(Info()); + TString errString; + msg.RestoreFromProto(&dsGroupSelector, errString); + Y_VERIFY_S(errString.empty(), errString); + } + + std::optional<TUnifiedBlobId> evictedBlobId; + bool isSmall = false; + bool isFallback = false; + bool isOther = false; + for (const auto& range : msg.BlobRanges) { + auto& blobId = range.BlobId; + if (blobId.IsSmallBlob()) { + isSmall = true; + } else if (blobId.IsDsBlob()) { + isFallback = true; + if (evictedBlobId) { + // Can read only one blobId at a time (but multiple ranges from it) + Y_VERIFY(evictedBlobId == blobId); + } else { + evictedBlobId = blobId; + } + } else { + isOther = true; + } + } + + Y_VERIFY(isSmall != isFallback && !isOther); + + if (isSmall) { + Execute(new TTxReadBlobRanges(this, ev), ctx); + } else if (isFallback) { + Y_VERIFY(evictedBlobId->IsValid()); + + ui32 status = NKikimrProto::EReplyStatus::ERROR; + + NKikimrTxColumnShard::TEvictMetadata meta; + auto evicted = BlobManager->GetEvicted(*evictedBlobId, meta); + + if (!evicted.Blob.IsValid() || !evicted.ExternBlob.IsValid()) { + auto result = MakeErrorResponse(msg, TabletID(), status); + ctx.Send(ev->Sender, result.release(), 0, ev->Cookie); + } + + TString tierName = meta.GetTierName(); + Y_VERIFY(!tierName.empty()); + + if (!GetExportedBlob(ctx, ev->Sender, ev->Cookie, tierName, std::move(evicted), std::move(msg.BlobRanges))) { + auto result = MakeErrorResponse(msg, TabletID(), status); + ctx.Send(ev->Sender, result.release(), 0, ev->Cookie); + } + } } } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 8dc0483b55..943360b370 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -31,7 +31,7 @@ public: TTxType GetTxType() const override { return TXTYPE_START_SCAN; } private: - NOlap::TReadMetadataBase::TConstPtr CreateReadMetadata(const TActorContext& ctx, TReadDescription& read, + std::shared_ptr<NOlap::TReadMetadataBase> CreateReadMetadata(const TActorContext& ctx, TReadDescription& read, bool isIndexStats, bool isReverse, ui64 limit); private: @@ -105,7 +105,10 @@ private: if (!blobRange.BlobId.IsValid()) { return false; } - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange)); + + auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs; + bool fallback = externBlobs && externBlobs->count(blobRange.BlobId); + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, true, fallback)); ++InFlightReads; InFlightReadBytes += blobRange.Size; return true; @@ -509,7 +512,7 @@ static void FillPredicatesFromRange(TReadDescription& read, const ::NKikimrTx::T } } -NOlap::TReadStatsMetadata::TPtr +std::shared_ptr<NOlap::TReadStatsMetadata> PrepareStatsReadMetadata(ui64 tabletId, const TReadDescription& read, const std::unique_ptr<NOlap::IColumnEngine>& index, TString& error) { THashSet<ui32> readColumnIds(read.ColumnIds.begin(), read.ColumnIds.end()); for (auto& [id, name] : read.ProgramSourceColumns) { @@ -567,10 +570,10 @@ PrepareStatsReadMetadata(ui64 tabletId, const TReadDescription& read, const std: return out; } -NOlap::TReadMetadataBase::TConstPtr TTxScan::CreateReadMetadata(const TActorContext& ctx, TReadDescription& read, +std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TActorContext& ctx, TReadDescription& read, bool indexStats, bool isReverse, ui64 itemsLimit) { - NOlap::TReadMetadataBase::TPtr metadata; + std::shared_ptr<NOlap::TReadMetadataBase> metadata; if (indexStats) { metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->PrimaryIndex, ErrorDescription); } else { @@ -641,6 +644,9 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { if (!record.RangesSize()) { auto range = CreateReadMetadata(ctx, read, isIndexStats, record.GetReverse(), itemsLimit); if (range) { + if (!isIndexStats) { + Self->MapExternBlobs(ctx, static_cast<NOlap::TReadMetadata&>(*range)); + } ReadMetadataRanges = {range}; } return true; @@ -659,6 +665,9 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { ReadMetadataRanges.clear(); return true; } + if (!isIndexStats) { + Self->MapExternBlobs(ctx, static_cast<NOlap::TReadMetadata&>(*newRange)); + } ReadMetadataRanges.emplace_back(newRange); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 8e2d7f05ad..f0ad337377 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -698,13 +698,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { ActiveCompaction = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, Settings.CacheDataAfterCompaction); - return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev)); + return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager); } std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls, bool force) { if (ActiveTtl) { - LOG_S_DEBUG("Ttl already in progress at tablet " << TabletID()); + LOG_S_DEBUG("TTL already in progress at tablet " << TabletID()); return {}; } if (!PrimaryIndex) { @@ -740,7 +740,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u ActiveTtl = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, false); - return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), needWrites); + return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites); } std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { @@ -789,7 +789,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return {}; } - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), changes , false); + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), changes, false); ev->PutStatus = NKikimrProto::OK; // No new blobs to write ActiveCleanup = true; @@ -859,38 +859,79 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl return indexInfo; } -void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, - THashMap<TUnifiedBlobId, TString>&& blobsIds) { - if (!S3Actors.count(tierName)) { - TString tier(tierName); - LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID()); +void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMetadata& metadata) { + if (!metadata.SelectInfo) { return; } - auto& s3 = S3Actors[tierName]; - if (!s3) { - TString tier(tierName); - LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID()); + + if (!BlobManager->HasExternBlobs()) { return; } - auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsIds)); - ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release())); + + THashSet<TUnifiedBlobId> uniqBlobs; + for (auto& portion : metadata.SelectInfo->Portions) { + for (auto& rec : portion.Records) { + uniqBlobs.insert(rec.BlobRange.BlobId); + } + } + + THashMap<TUnifiedBlobId, TUnifiedBlobId> extMap; + + for (auto& blobId : uniqBlobs) { + TEvictMetadata meta; + auto evicted = BlobManager->GetEvicted(blobId, meta); + if (evicted.ExternBlob.IsValid()) { + extMap[blobId] = evicted.ExternBlob; + } + } + + if (!extMap.empty()) { + metadata.ExternBlobs = std::make_shared<const THashMap<TUnifiedBlobId, TUnifiedBlobId>>(std::move(extMap)); + } } -void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, - std::vector<NOlap::TEvictedBlob>&& blobs) { +TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& phase) { if (!S3Actors.count(tierName)) { - LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on forget) at tablet " << TabletID()); - return; + LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID()); + return {}; } - auto& s3 = S3Actors[tierName]; + auto s3 = S3Actors[tierName]; if (!s3) { - LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on forget) at tablet " << TabletID()); - return; + LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID()); + return {}; + } + return s3; +} + +void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, + THashMap<TUnifiedBlobId, TString>&& blobsIds) { + if (auto s3 = GetS3ActorForTier(tierName, "export")) { + auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsIds)); + ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release())); } +} + +void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, + std::vector<NOlap::TEvictedBlob>&& blobs) { + if (auto s3 = GetS3ActorForTier(tierName, "forget")) { + auto forget = std::make_unique<TEvPrivate::TEvForget>(); + forget->Evicted = std::move(blobs); + ctx.Send(s3, forget.release()); + } +} - auto forget = std::make_unique<TEvPrivate::TEvForget>(); - forget->Evicted = std::move(blobs); - ctx.Send(s3, forget.release()); +bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName, + NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges) { + if (auto s3 = GetS3ActorForTier(tierName, "get exported")) { + auto get = std::make_unique<TEvPrivate::TEvGetExported>(); + get->DstActor = dst; + get->DstCookie = cookie; + get->Evicted = std::move(evicted); + get->BlobRanges = std::move(ranges); + ctx.Send(s3, get.release()); + return true; + } + return false; } ui32 TColumnShard::InitS3Actors(const TActorContext& ctx, bool init) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 6591cae2f9..943263d2a4 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -426,9 +426,13 @@ private: void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions); NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); + void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata); + TActorId GetS3ActorForTier(const TString& tierName, const TString& phase); void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, THashMap<TUnifiedBlobId, TString>&& blobsIds); void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs); + bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName, + NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges); ui32 InitS3Actors(const TActorContext& ctx, bool init); void StopS3Actors(const TActorContext& ctx); diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h index 076efaa758..4f6a0e5372 100644 --- a/ydb/core/tx/columnshard/columnshard_txs.h +++ b/ydb/core/tx/columnshard/columnshard_txs.h @@ -23,6 +23,7 @@ struct TEvPrivate { EvS3Settings, EvExport, EvForget, + EvGetExported, EvEnd }; @@ -60,21 +61,52 @@ struct TEvPrivate { struct TEvCompaction : public TEventLocal<TEvCompaction, EvIndexing> { std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent; + THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GroupedBlobRanges; + THashSet<TUnifiedBlobId> Externals; - explicit TEvCompaction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent) + explicit TEvCompaction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent, IBlobExporter& blobManager) : TxEvent(std::move(txEvent)) { TxEvent->GranuleCompaction = true; + Y_VERIFY(TxEvent->IndexChanges); + + GroupedBlobRanges = NOlap::TColumnEngineChanges::GroupedBlobRanges(TxEvent->IndexChanges->SwitchedPortions); + + if (blobManager.HasExternBlobs()) { + for (auto& [blobId, _] : GroupedBlobRanges) { + TEvictMetadata meta; + if (blobManager.GetEvicted(blobId, meta).IsExternal()) { + Externals.insert(blobId); + } + } + } } }; struct TEvEviction : public TEventLocal<TEvEviction, EvEviction> { std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent; + THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GroupedBlobRanges; + THashSet<TUnifiedBlobId> Externals; - explicit TEvEviction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent, bool needWrites) + explicit TEvEviction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent, IBlobExporter& blobManager, + bool needWrites) : TxEvent(std::move(txEvent)) { - if (!needWrites) { + Y_VERIFY(TxEvent->IndexChanges); + + if (needWrites) { + GroupedBlobRanges = + NOlap::TColumnEngineChanges::GroupedBlobRanges(TxEvent->IndexChanges->PortionsToEvict); + + if (blobManager.HasExternBlobs()) { + for (auto& [blobId, _] : GroupedBlobRanges) { + TEvictMetadata meta; + if (blobManager.GetEvicted(blobId, meta).IsExternal()) { + Externals.insert(blobId); + } + } + } + } else { TxEvent->PutStatus = NKikimrProto::OK; } } @@ -132,6 +164,13 @@ struct TEvPrivate { TString ErrorStr; }; + struct TEvGetExported : public TEventLocal<TEvGetExported, EvGetExported> { + TActorId DstActor; // It's a BlobCache actor. S3 actor sends TEvReadBlobRangesResult to it as result + ui64 DstCookie; + NOlap::TEvictedBlob Evicted; + std::vector<NOlap::TBlobRange> BlobRanges; + }; + struct TEvScanStats : public TEventLocal<TEvScanStats, EvScanStats> { TEvScanStats(ui64 rows, ui64 bytes) : Rows(rows), Bytes(bytes) {} ui64 Rows; @@ -209,7 +248,7 @@ protected: : TBase(self) {} - NOlap::TReadMetadata::TPtr PrepareReadMetadata( + std::shared_ptr<NOlap::TReadMetadata> PrepareReadMetadata( const TActorContext& ctx, const TReadDescription& readDescription, const std::unique_ptr<NOlap::TInsertTable>& insertTable, diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index 1e61e2b5f1..b64ed17140 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -18,7 +18,7 @@ public: , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) {} - void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& /*ctx*/) { auto& event = *ev->Get(); TxEvent = std::move(event.TxEvent); Y_VERIFY(TxEvent); @@ -30,23 +30,14 @@ public: LOG_S_DEBUG("Granules compaction: " << *indexChanges << " at tablet " << TabletId); - auto& switchedPortions = indexChanges->SwitchedPortions; - Y_VERIFY(switchedPortions.size()); + for (auto& [blobId, ranges] : event.GroupedBlobRanges) { + Y_VERIFY(!ranges.empty()); - for (auto& portionInfo : switchedPortions) { - Y_VERIFY(!portionInfo.Empty()); - std::vector<NBlobCache::TBlobRange> ranges; - for (auto& rec : portionInfo.Records) { - auto& blobRange = rec.BlobRange; + for (const auto& blobRange : ranges) { + Y_VERIFY(blobId == blobRange.BlobId); Blobs[blobRange] = {}; - // Group only ranges from the same blob into one request - if (!ranges.empty() && ranges.back().BlobId != blobRange.BlobId) { - SendReadRequest(ctx, std::move(ranges)); - ranges = {}; - } - ranges.push_back(blobRange); } - SendReadRequest(ctx, std::move(ranges)); + SendReadRequest(std::move(ranges), event.Externals.count(blobId)); } } @@ -110,11 +101,11 @@ private: NumRead = 0; } - void SendReadRequest(const TActorContext&, std::vector<NBlobCache::TBlobRange>&& ranges) { - if (ranges.empty()) - return; + void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) { + Y_VERIFY(!ranges.empty()); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false)); + Send(BlobCacheActorId, + new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false, isExternal)); } void CompactGranules(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 8fb6ac424a..c7d9b20a6f 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -161,6 +161,36 @@ public: return size; } + static THashMap<TUnifiedBlobId, std::vector<TBlobRange>> + GroupedBlobRanges(const TVector<TPortionInfo>& portions) { + Y_VERIFY(portions.size()); + + THashMap<TUnifiedBlobId, std::vector<TBlobRange>> sameBlobRanges; + for (auto& portionInfo : portions) { + Y_VERIFY(!portionInfo.Empty()); + + for (auto& rec : portionInfo.Records) { + sameBlobRanges[rec.BlobRange.BlobId].push_back(rec.BlobRange); + } + } + return sameBlobRanges; + } + + static THashMap<TUnifiedBlobId, std::vector<TBlobRange>> + GroupedBlobRanges(const TVector<std::pair<TPortionInfo, TString>>& portions) { + Y_VERIFY(portions.size()); + + THashMap<TUnifiedBlobId, std::vector<TBlobRange>> sameBlobRanges; + for (auto& [portionInfo, _] : portions) { + Y_VERIFY(!portionInfo.Empty()); + + for (auto& rec : portionInfo.Records) { + sameBlobRanges[rec.BlobRange.BlobId].push_back(rec.BlobRange); + } + } + return sameBlobRanges; + } + friend IOutputStream& operator << (IOutputStream& out, const TColumnEngineChanges& changes) { if (ui32 switched = changes.SwitchedPortions.size()) { out << "switch " << switched << " portions"; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 938c141fbc..7509cc1dac 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -31,7 +31,6 @@ struct TReadStats { // Holds all metedata that is needed to perform read/scan struct TReadMetadataBase { - using TPtr = std::shared_ptr<TReadMetadataBase>; using TConstPtr = std::shared_ptr<const TReadMetadataBase>; enum class ESorting { @@ -48,6 +47,7 @@ struct TReadMetadataBase { std::shared_ptr<arrow::Schema> LoadSchema; // ResultSchema + required for intermediate operations std::shared_ptr<arrow::Schema> ResultSchema; // TODO: add Program modifications std::vector<std::shared_ptr<NArrow::TProgramStep>> Program; + std::shared_ptr<const THashMap<TUnifiedBlobId, TUnifiedBlobId>> ExternBlobs; // DS -> S3 map TODO: move out of base ESorting Sorting{ESorting::ASC}; // Sorting inside returned batches ui64 Limit{0}; // TODO @@ -74,7 +74,6 @@ struct TReadMetadataBase { // Holds all metadata that is needed to perform read/scan struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadMetadata> { - using TPtr = std::shared_ptr<TReadMetadata>; using TConstPtr = std::shared_ptr<const TReadMetadata>; TIndexInfo IndexInfo; @@ -154,7 +153,6 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_ }; struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadStatsMetadata> { - using TPtr = std::shared_ptr<TReadStatsMetadata>; using TConstPtr = std::shared_ptr<const TReadStatsMetadata>; const ui64 TabletId; diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index da752e96d2..4cdef39b0b 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -18,7 +18,7 @@ public: , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) {} - void Handle(TEvPrivate::TEvEviction::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvPrivate::TEvEviction::TPtr& ev, const TActorContext& /*ctx*/) { auto& event = *ev->Get(); TxEvent = std::move(event.TxEvent); Y_VERIFY(TxEvent); @@ -29,23 +29,14 @@ public: LOG_S_DEBUG("Portions eviction: " << *indexChanges << " at tablet " << TabletId); - auto& evictedPortions = indexChanges->PortionsToEvict; - Y_VERIFY(evictedPortions.size()); + for (auto& [blobId, ranges] : event.GroupedBlobRanges) { + Y_VERIFY(!ranges.empty()); - for (auto& [portionInfo, tierName] : evictedPortions) { - Y_VERIFY(!portionInfo.Empty()); - std::vector<NBlobCache::TBlobRange> ranges; - for (auto& rec : portionInfo.Records) { - auto& blobRange = rec.BlobRange; + for (const auto& blobRange : ranges) { + Y_VERIFY(blobId == blobRange.BlobId); Blobs[blobRange] = {}; - // Group only ranges from the same blob into one request - if (!ranges.empty() && ranges.back().BlobId != blobRange.BlobId) { - SendReadRequest(ctx, std::move(ranges)); - ranges = {}; - } - ranges.push_back(blobRange); } - SendReadRequest(ctx, std::move(ranges)); + SendReadRequest(std::move(ranges), event.Externals.count(blobId)); } } @@ -109,12 +100,11 @@ private: NumRead = 0; } - void SendReadRequest(const TActorContext&, std::vector<NBlobCache::TBlobRange>&& ranges) { - if (ranges.empty()) { - return; - } + void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) { + Y_VERIFY(!ranges.empty()); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false)); + Send(BlobCacheActorId, + new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false, isExternal)); } void EvictPortions(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp index 3963279ffc..4038f1cde3 100644 --- a/ydb/core/tx/columnshard/export_actor.cpp +++ b/ydb/core/tx/columnshard/export_actor.cpp @@ -88,7 +88,7 @@ private: void SendReadRequest(const NBlobCache::TBlobRange& blobRange) { Y_VERIFY(!blobRange.Offset); Y_VERIFY(blobRange.Size); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false)); + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false, false)); } void SendResultAndDie(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index 4117e9f57f..b745bfd4df 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -100,7 +100,7 @@ private: void SendReadRequest(const NBlobCache::TBlobRange& blobRange) { Y_VERIFY(blobRange.Size); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false)); + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false, false)); } void Index(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index aafb2592d0..90bf28a3c4 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -187,7 +187,10 @@ public: void SendReadRequest(const TActorContext& ctx, const NBlobCache::TBlobRange& blobRange) { Y_UNUSED(ctx); Y_VERIFY(blobRange.Size); - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange)); + + auto& externBlobs = ReadMetadata->ExternBlobs; + bool fallback = externBlobs && externBlobs->count(blobRange.BlobId); + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, true, fallback)); } STFUNC(StateWait) { diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/columnshard/s3_actor.cpp index 4d91b2b4fb..bd71bafdc7 100644 --- a/ydb/core/tx/columnshard/s3_actor.cpp +++ b/ydb/core/tx/columnshard/s3_actor.cpp @@ -13,6 +13,10 @@ using NWrappers::TEvS3Wrapper; namespace { +TString ExtractBlobPart(const NOlap::TBlobRange& blobRange, const TString& data) { + return TString(&data[blobRange.Offset], blobRange.Size); +} + struct TS3Export { std::unique_ptr<TEvPrivate::TEvExport> Event; THashSet<TString> KeysToWrite; @@ -130,7 +134,7 @@ public: auto& forget = Forgets[forgetNo]; for (auto& evict : forget.Event->Evicted) { - if (!evict.ExternBlob.IsValid()) { + if (!evict.ExternBlob.IsS3Blob()) { LOG_S_ERROR("[S3] Forget not exported '" << evict.Blob.ToStringNew() << "' at tablet " << TabletId); continue; } @@ -144,6 +148,26 @@ public: } } + void Handle(TEvPrivate::TEvGetExported::TPtr& ev) { + auto& evict = ev->Get()->Evicted; + if (!evict.ExternBlob.IsS3Blob()) { + LOG_S_ERROR("[S3] Get not exported '" << evict.Blob.ToStringNew() << "' at tablet " << TabletId); + return; + } + + TString key = evict.ExternBlob.GetS3Key(); + + bool reading = ReadingKeys.count(key); + ReadingKeys[key].emplace_back(ev->Release().Release()); + + if (!reading) { + ui64 blobSize = evict.ExternBlob.BlobSize(); + SendGetObject(key, {0, blobSize}); + } else { + LOG_S_DEBUG("[S3] Outstanding get key '" << key << "' at tablet " << TabletId); + } + } + // TODO: clean written blobs in failed export void Handle(TEvS3Wrapper::TEvPutObjectResponse::TPtr& ev) { Y_VERIFY(Initialized()); @@ -233,29 +257,6 @@ public: Forgets.erase(forgetNo); } } -#if 0 - void Handle(TEvS3Wrapper::TEvHeadObjectResponse::TPtr& ev) { - Y_VERIFY(Initialized()); - - auto& msg = *ev->Get(); - const auto& key = msg.Key; - const auto& resultOutcome = msg.Result; - - TString errStr; - if (!resultOutcome.IsSuccess()) { - errStr = LogError("HeadObjectResponse", resultOutcome.GetError(), !!key); - } - - if (!errStr.empty()) { - //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}, errStr)); - } - - Y_VERIFY(key); - ui64 contentLength = resultOutcome.GetResult().GetContentLength(); - LOG_S_DEBUG("HeadObjectResponse '" << *key << "', size: " << contentLength << " at tablet " << TabletId); - - //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {})); - } void Handle(TEvS3Wrapper::TEvGetObjectResponse::TPtr& ev) { Y_VERIFY(Initialized()); @@ -270,18 +271,49 @@ public: errStr = LogError("GetObjectResponse", resultOutcome.GetError(), !!key); } - if (!errStr.empty()) { - //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}, errStr)); + if (!key || key->empty()) { + LOG_S_ERROR("[S3] no key in GetObjectResponse at tablet " << TabletId << ": " << errStr); + return; // nothing to do without key + } + + if (!ReadingKeys.count(*key)) { + LOG_S_ERROR("[S3] no reading keys for key " << *key << " at tablet " << TabletId); + return; // nothing to do without events } // TODO: CheckETag - Y_VERIFY(key); LOG_S_DEBUG("GetObjectResponse '" << *key << "', size: " << data.size() << " at tablet " << TabletId); - //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, data)); + auto status = errStr.empty() ? NKikimrProto::OK : NKikimrProto::ERROR; + + for (const auto& ev : ReadingKeys[*key]) { + auto result = std::make_unique<TEvColumnShard::TEvReadBlobRangesResult>(TabletId); + + for (const auto& blobRange : ev->BlobRanges) { + if (data.size() < blobRange.Offset + blobRange.Size) { + LOG_S_ERROR("GetObjectResponse '" << *key << "', data size: " << data.size() + << " is too small for blob range {" << blobRange.Offset << "," << blobRange.Size << "}" + << " at tablet " << TabletId); + status = NKikimrProto::ERROR; + } + + auto* res = result->Record.AddResults(); + auto* resRange = res->MutableBlobRange(); + resRange->SetBlobId(blobRange.BlobId.ToStringNew()); + resRange->SetOffset(blobRange.Offset); + resRange->SetSize(blobRange.Size); + res->SetStatus(status); + + if (status == NKikimrProto::OK) { + res->SetData(ExtractBlobPart(blobRange, data)); + } + } + + Send(ev->DstActor, result.release(), 0, ev->DstCookie); + } + ReadingKeys.erase(*key); } -#endif private: ui64 TabletId; @@ -294,18 +326,20 @@ private: THashMap<ui64, TS3Forget> Forgets; THashMap<TString, ui64> ExportingKeys; THashMap<TString, ui64> ForgettingKeys; + THashMap<TString, std::vector<std::unique_ptr<TEvPrivate::TEvGetExported>>> ReadingKeys; STATEFN(StateWait) { switch (ev->GetTypeRewrite()) { hFunc(TEvPrivate::TEvS3Settings, Handle); hFunc(TEvPrivate::TEvExport, Handle); hFunc(TEvPrivate::TEvForget, Handle); + hFunc(TEvPrivate::TEvGetExported, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); hFunc(TEvS3Wrapper::TEvPutObjectResponse, Handle); hFunc(TEvS3Wrapper::TEvDeleteObjectResponse, Handle); + hFunc(TEvS3Wrapper::TEvGetObjectResponse, Handle); #if 0 hFunc(TEvS3Wrapper::TEvHeadObjectResponse, Handle); - hFunc(TEvS3Wrapper::TEvGetObjectResponse, Handle); #endif default: break; @@ -347,11 +381,11 @@ private: Send(S3Ctx.Client, new TEvS3Wrapper::TEvHeadObjectRequest(request)); } - void SendGetObject(const TString& key) { + void SendGetObject(const TString& key, const std::pair<ui64, ui64>& range) { auto request = Aws::S3::Model::GetObjectRequest() .WithBucket(Bucket) - .WithKey(key); - //.WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); // TODO + .WithKey(key) + .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); LOG_S_DEBUG("[S3] GetObjectRequest key '" << key << "' at tablet " << TabletId); Send(S3Ctx.Client, new TEvS3Wrapper::TEvGetObjectRequest(request)); diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index cd37e85dc6..4f4462ef30 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -269,6 +269,8 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe TTestBasicRuntime runtime; TTester::Setup(runtime); + //runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG); + TActorId sender = runtime.AllocateEdgeActor(); CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::COLUMNSHARD), |