aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-04-18 15:13:58 +0300
committerArtem Zuikov <chertus@gmail.com>2022-04-18 15:13:58 +0300
commitd16fc999f8322e88777cea8bde5b94eda867fb19 (patch)
tree2f7dba65f9dc23bc0975113cbe0cd1b907b7ff9c
parentc046b295578935b7ff07beb0913a315caa8f92de (diff)
downloadydb-d16fc999f8322e88777cea8bde5b94eda867fb19.tar.gz
KIKIMR-13593: read from S3
ref:dd077b5b7ea4145cc71dea1d4c3c515d491896d5
-rw-r--r--ydb/core/tx/columnshard/blob.h4
-rw-r--r--ydb/core/tx/columnshard/blob_cache.cpp181
-rw-r--r--ydb/core/tx/columnshard/blob_cache.h30
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp9
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h8
-rw-r--r--ydb/core/tx/columnshard/columnshard.h34
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp9
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp70
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp91
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard_txs.h47
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp29
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h30
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h4
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp30
-rw-r--r--ydb/core/tx/columnshard/export_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp5
-rw-r--r--ydb/core/tx/columnshard/s3_actor.cpp100
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp2
21 files changed, 508 insertions, 202 deletions
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h
index 250b7db6eb..97cbb54b82 100644
--- a/ydb/core/tx/columnshard/blob.h
+++ b/ydb/core/tx/columnshard/blob.h
@@ -330,6 +330,10 @@ struct TEvictedBlob {
ui64 Hash() const noexcept {
return Blob.Hash();
}
+
+ bool IsExternal() const {
+ return ExternBlob.IsValid();
+ }
};
}
diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp
index 1d676cd21a..0ef5cb89c7 100644
--- a/ydb/core/tx/columnshard/blob_cache.cpp
+++ b/ydb/core/tx/columnshard/blob_cache.cpp
@@ -13,6 +13,24 @@
namespace NKikimr::NBlobCache {
+namespace {
+ // Blobs with same tagret can be read in a single request
+ // (e.g. DS blobs from the same tablet residing on the same DS group, or 2 small blobs from the same tablet)
+ std::pair<ui64, ui32> BlobSource(const TUnifiedBlobId& blobId) {
+ Y_VERIFY(blobId.IsValid());
+
+ if (blobId.IsDsBlob()) {
+ // Tablet & group restriction
+ return {blobId.GetTabletId(), blobId.GetDsGroup()};
+ } else if (blobId.IsSmallBlob()) {
+ // Tablet restriction, no group restrictions
+ return {blobId.GetTabletId(), 0};
+ }
+
+ return {0, 0};
+ }
+}
+
using namespace NActors;
class TBlobCache: public TActorBootstrapped<TBlobCache> {
@@ -26,6 +44,11 @@ private:
{}
};
+ struct TReadItem {
+ TBlobRange BlobRange;
+ bool Fallback{false};
+ };
+
static constexpr i64 MAX_IN_FLIGHT_BYTES = 250ll << 20;
static constexpr i64 MAX_REQUEST_BYTES = 8ll << 20;
static constexpr TDuration DEFAULT_READ_DEADLINE = TDuration::Seconds(5);
@@ -35,17 +58,18 @@ private:
// It is used to remove all blob ranges from cache when
// it gets a notification that a blob has been deleted
TControlWrapper MaxCacheDataSize;
+ TControlWrapper MaxCacheExternalDataSize; // Cache size for extern (i.e. S3) blobs
TControlWrapper MaxInFlightDataSize;
i64 CacheDataSize; // Current size of all blobs in cache
ui64 ReadCookie;
THashMap<ui64, std::vector<TBlobRange>> CookieToRange; // All in-flight requests
THashMap<TBlobRange, TReadInfo> OutstandingReads; // All in-flight and enqueued reads
- TDeque<TBlobRange> ReadQueue; // Reads that are waiting to be sent
+ TDeque<TReadItem> ReadQueue; // Reads that are waiting to be sent
// TODO: Consider making per-group queues
i64 InFlightDataSize; // Current size of all in-flight blobs
THashMap<ui64, TActorId> ShardPipes; // TabletId -> PipeClient for small blob read requests
- THashMap<ui64, THashSet<ui64>> InFlightSmallBlobRequests; // TabletId -> list to read cookies
+ THashMap<ui64, THashSet<ui64>> InFlightTabletRequests; // TabletId -> list to read cookies
using TCounterPtr = NMonitoring::TDynamicCounters::TCounterPtr;
const TCounterPtr SizeBytes;
@@ -75,6 +99,7 @@ public:
: TActorBootstrapped<TBlobCache>()
, Cache(SIZE_MAX)
, MaxCacheDataSize(maxSize, 0, 1ull << 40)
+ , MaxCacheExternalDataSize(0, 0, 1ull << 40)
, MaxInFlightDataSize(Min<i64>(MaxCacheDataSize, MAX_IN_FLIGHT_BYTES), 0, 10ull << 30)
, CacheDataSize(0)
, ReadCookie(1)
@@ -98,8 +123,10 @@ public:
{}
void Bootstrap(const TActorContext& ctx) {
- AppData(ctx)->Icb->RegisterSharedControl(MaxCacheDataSize, "BlobCache.MaxCacheDataSize");
- AppData(ctx)->Icb->RegisterSharedControl(MaxInFlightDataSize, "BlobCache.MaxInFlightDataSize");
+ auto& icb = AppData(ctx)->Icb;
+ icb->RegisterSharedControl(MaxCacheDataSize, "BlobCache.MaxCacheDataSize");
+ icb->RegisterSharedControl(MaxCacheExternalDataSize, "BlobCache.MaxCacheExternalDataSize");
+ icb->RegisterSharedControl(MaxInFlightDataSize, "BlobCache.MaxInFlightDataSize");
Become(&TBlobCache::StateFunc);
ScheduleWakeup();
}
@@ -142,17 +169,18 @@ private:
void Handle(TEvBlobCache::TEvReadBlobRange::TPtr& ev, const TActorContext& ctx) {
const TBlobRange& blobRange = ev->Get()->BlobRange;
- const bool promote = ev->Get()->Cache;
+ const bool promote = ev->Get()->CacheAfterRead;
+ const bool fallback = ev->Get()->Fallback;
- LOG_S_DEBUG("Read request: " << blobRange);
+ LOG_S_DEBUG("Read request: " << blobRange << " cache: " << (ui32)promote << " fallback: " << (ui32)fallback);
- HandleSingleRangeRead(blobRange, promote, ev->Sender, ctx);
+ HandleSingleRangeRead(blobRange, promote, fallback, ev->Sender, ctx);
MakeReadRequests(ctx);
}
void HandleSingleRangeRead(const TBlobRange& blobRange,
- const bool promoteInCache, const TActorId& sender, const TActorContext& ctx)
+ bool promoteInCache, bool fallback, const TActorId& sender, const TActorContext& ctx)
{
// Is in cache?
auto it = promoteInCache ? Cache.Find(blobRange) : Cache.FindWithoutPromote(blobRange);
@@ -164,7 +192,13 @@ private:
Misses->Inc();
- // Is outsanding?
+ // Disable promoting reads for external blobs if MaxCacheExternalDataSize is zero. But keep promoting hits.
+ // For now MaxCacheExternalDataSize is just a disabled/enabled flag. TODO: real MaxCacheExternalDataSize
+ if (fallback && MaxCacheExternalDataSize == 0) {
+ promoteInCache = false;
+ }
+
+ // Is outstanding?
auto readIt = OutstandingReads.find(blobRange);
if (readIt != OutstandingReads.end()) {
readIt->second.Waiting.push_back(sender);
@@ -172,15 +206,16 @@ private:
return;
}
- EnqueueRead(blobRange, promoteInCache, sender);
+ EnqueueRead(blobRange, promoteInCache, sender, fallback);
}
void Handle(TEvBlobCache::TEvReadBlobRangeBatch::TPtr& ev, const TActorContext& ctx) {
const auto& ranges = ev->Get()->BlobRanges;
+ bool fallback = ev->Get()->Fallback;
LOG_S_DEBUG("Batch read request: " << JoinStrings(ranges.begin(), ranges.end(), " "));
for (const auto& blobRange : ranges) {
- HandleSingleRangeRead(blobRange, ev->Get()->Cache, ev->Sender, ctx);
+ HandleSingleRangeRead(blobRange, ev->Get()->CacheAfterRead, fallback, ev->Sender, ctx);
}
MakeReadRequests(ctx);
@@ -240,14 +275,14 @@ private:
CachedRanges.erase(blobIdIt);
}
- void EnqueueRead(const TBlobRange& blobRange, const bool promoteInCache, const TActorId& sender) {
+ void EnqueueRead(const TBlobRange& blobRange, bool promoteInCache, const TActorId& sender, bool fallback) {
TReadInfo& blobInfo = OutstandingReads[blobRange];
blobInfo.Waiting.push_back(sender);
blobInfo.Cache = promoteInCache;
LOG_S_DEBUG("Enqueue read range: " << blobRange);
- ReadQueue.push_back(blobRange);
+ ReadQueue.push_back(TReadItem{blobRange, fallback});
ReadsInQueue->Set(ReadQueue.size());
}
@@ -284,68 +319,74 @@ private:
ReadRequests->Inc();
}
- // Checks if 2 blobs can be read in a single request (e.g. DS blobs from the same
- // tablet residing on the same DS group, or 2 small blobs from the same tablet)
- inline bool CanBatchReads(const TUnifiedBlobId& a, const TUnifiedBlobId& b) {
- if (a.GetType() != b.GetType()) {
- return false;
- }
+ void MakeReadRequests(const TActorContext& ctx) {
+ THashMap<std::pair<ui64, ui32>, std::vector<TBlobRange>> groupedBlobRanges;
+ THashMap<TUnifiedBlobId, std::vector<TBlobRange>> fallbackRanges;
+
+ while (!ReadQueue.empty()) {
+ const auto& readItem = ReadQueue.front();
+ const TBlobRange& blobRange = readItem.BlobRange;
+
+ if (readItem.Fallback) {
+ // For now it's always possible to add external read cause we must not block ReadQueue by such reads
+ // TODO: separate ReadQueue and InFlightDataSize for external reads
+ fallbackRanges[blobRange.BlobId].push_back(blobRange);
+ } else {
+ // NOTE: if queue is not empty, at least 1 in-flight request is allowed
+ if (InFlightDataSize && InFlightDataSize >= MaxInFlightDataSize) {
+ break;
+ }
+ InFlightDataSize += blobRange.Size;
- if (!a.IsValid()) {
- return false;
- }
+ std::pair<ui64, ui32> blobSrc = BlobSource(blobRange.BlobId);
+ groupedBlobRanges[blobSrc].push_back(blobRange);
+ }
- // Same tablet and same DS group?
- if (a.IsDsBlob()) {
- return a.GetTabletId() == b.GetTabletId() &&
- a.GetDsGroup() == b.GetDsGroup();
- }
+ SizeBytesInFlight->Add(blobRange.Size);
+ SizeBlobsInFlight->Inc();
- // Small blobs from the same tablet?
- if (a.IsSmallBlob()) {
- return a.GetTabletId() == b.GetTabletId();
+ ReadQueue.pop_front();
}
- return false;
- }
+ ReadsInQueue->Set(ReadQueue.size());
+
+ // We might need to free some space to accomodate the results of new reads
+ Evict(ctx);
- void MakeReadRequests(const TActorContext& ctx) {
- std::vector<TBlobRange> blobRanges;
ui64 cookie = ++ReadCookie;
- ui64 requestSize = 0;
- // NOTE: if queue is not empty, at least 1 in-flight request is allowed
- while (!ReadQueue.empty() && (InFlightDataSize == 0 || InFlightDataSize < MaxInFlightDataSize)) {
- const TBlobRange& blobRange = ReadQueue.front();
+ for (auto& [target, rangesGroup] : groupedBlobRanges) {
+ std::vector<TBlobRange> rangesBatch;
+ rangesBatch.reserve(rangesGroup.size());
+ ui64 requestSize = 0;
+
+ for (auto& blobRange : rangesGroup) {
+ if (!rangesBatch.empty() && (requestSize + blobRange.Size > MAX_REQUEST_BYTES)) {
+ SendBatchReadRequest(rangesBatch, cookie, ctx);
+ rangesBatch.clear();
+ cookie = ++ReadCookie;
+ requestSize = 0;
+ }
- // Only group ranges from the same Tablet and same DS group
- if (!blobRanges.empty() && (
- !CanBatchReads(blobRanges.back().BlobId, blobRange.BlobId) ||
- requestSize > MAX_REQUEST_BYTES
- ))
- {
- SendBatchReadRequest(blobRanges, cookie, ctx);
- blobRanges.clear();
- cookie = ++ReadCookie;
- requestSize = 0;
+ rangesBatch.push_back(blobRange);
+ requestSize += blobRange.Size;
+ CookieToRange[cookie].push_back(blobRange);
}
- blobRanges.push_back(blobRange);
- requestSize += blobRange.Size;
- CookieToRange[cookie].push_back(blobRange);
-
- SizeBytesInFlight->Add(blobRange.Size);
- SizeBlobsInFlight->Inc();
- InFlightDataSize += blobRange.Size;
+ if (!rangesBatch.empty()) {
+ SendBatchReadRequest(rangesBatch, cookie, ctx);
+ }
+ }
- // We might need to free some space to accomodate the results of new reads
- Evict(ctx);
+ for (auto& [blobId, ranges] : fallbackRanges) {
+ Y_VERIFY(blobId.IsDsBlob());
+ Y_VERIFY(!ranges.empty());
- ReadQueue.pop_front();
- }
- if (!blobRanges.empty()) {
- SendBatchReadRequest(blobRanges, cookie, ctx);
+ cookie = ++ReadCookie;
+ for (auto& blobRange : ranges) {
+ CookieToRange[cookie].push_back(blobRange);
+ }
+ SendBatchReadRequestToTablet(ranges, cookie, ctx);
}
- ReadsInQueue->Set(ReadQueue.size());
}
void SendResult(const TActorId& to, const TBlobRange& blobRange, NKikimrProto::EReplyStatus status,
@@ -445,15 +486,9 @@ private:
ShardPipes[tabletId] = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
}
- auto ev = std::make_unique<TEvColumnShard::TEvReadBlobRanges>();
- for (const auto& r : blobRanges) {
- auto* range = ev->Record.AddBlobRanges();
- range->SetBlobId(r.BlobId.ToStringNew());
- range->SetOffset(r.Offset);
- range->SetSize(r.Size);
- }
+ auto ev = std::make_unique<TEvColumnShard::TEvReadBlobRanges>(blobRanges);
- InFlightSmallBlobRequests[tabletId].insert(cookie);
+ InFlightTabletRequests[tabletId].insert(cookie);
NTabletPipe::SendData(ctx, ShardPipes[tabletId], ev.release(), cookie);
}
@@ -461,8 +496,8 @@ private:
void DestroyPipe(ui64 tabletId, const TActorContext& ctx) {
ShardPipes.erase(tabletId);
// Send errors for in-flight requests
- auto cookies = std::move(InFlightSmallBlobRequests[tabletId]);
- InFlightSmallBlobRequests.erase(tabletId);
+ auto cookies = std::move(InFlightTabletRequests[tabletId]);
+ InFlightTabletRequests.erase(tabletId);
for (ui64 readCookie : cookies) {
auto cookieIt = CookieToRange.find(readCookie);
if (cookieIt == CookieToRange.end()) {
@@ -509,7 +544,7 @@ private:
ui64 readCookie = ev->Cookie;
LOG_S_DEBUG("Got read result from tablet: " << tabletId);
- InFlightSmallBlobRequests[tabletId].erase(readCookie);
+ InFlightTabletRequests[tabletId].erase(readCookie);
auto cookieIt = CookieToRange.find(readCookie);
if (cookieIt == CookieToRange.end()) {
diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h
index 6901eb218b..68ae86f5e0 100644
--- a/ydb/core/tx/columnshard/blob_cache.h
+++ b/ydb/core/tx/columnshard/blob_cache.h
@@ -36,11 +36,13 @@ struct TEvBlobCache {
struct TEvReadBlobRange : public NActors::TEventLocal<TEvReadBlobRange, EvReadBlobRange> {
TBlobRange BlobRange;
- bool Cache;
- // TODO: pass some kind of priority?
- explicit TEvReadBlobRange(const TBlobRange& blobRange, bool cache = true)
+ bool CacheAfterRead;
+ bool Fallback;
+
+ explicit TEvReadBlobRange(const TBlobRange& blobRange, bool cacheResult, bool fallback)
: BlobRange(blobRange)
- , Cache(cache)
+ , CacheAfterRead(cacheResult)
+ , Fallback(fallback)
{}
};
@@ -48,12 +50,20 @@ struct TEvBlobCache {
// This is usefull to save IOPs when reading multiple columns from the same blob
struct TEvReadBlobRangeBatch : public NActors::TEventLocal<TEvReadBlobRangeBatch, EvReadBlobRangeBatch> {
std::vector<TBlobRange> BlobRanges;
- bool Cache;
- // TODO: pass some kind of priority?
- explicit TEvReadBlobRangeBatch(std::vector<TBlobRange>&& blobRanges, bool cache = true)
- : BlobRanges(blobRanges)
- , Cache(cache)
- {}
+ bool CacheAfterRead;
+ bool Fallback;
+
+ explicit TEvReadBlobRangeBatch(std::vector<TBlobRange>&& blobRanges, bool cacheResult, bool fallback)
+ : BlobRanges(std::move(blobRanges))
+ , CacheAfterRead(cacheResult)
+ , Fallback(fallback)
+ {
+ if (fallback) {
+ for (const auto& blobRange : BlobRanges) {
+ Y_VERIFY(blobRange.BlobId == BlobRanges[0].BlobId);
+ }
+ }
+ }
};
struct TEvReadBlobRangeResult : public NActors::TEventLocal<TEvReadBlobRangeResult, EvReadBlobRangeResult> {
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index 48c66c894e..bf958b428a 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -570,6 +570,15 @@ bool TBlobManager::LoadOneToOneExport(IBlobManagerDb& db) {
return true;
}
+TEvictedBlob TBlobManager::GetEvicted(const TUnifiedBlobId& blobId, TEvictMetadata& meta) {
+ auto it = EvictedBlobs.find(TEvictedBlob{.Blob = blobId});
+ if (it != EvictedBlobs.end()) {
+ meta = it->second;
+ return it->first;
+ }
+ return {};
+}
+
TEvictedBlob TBlobManager::GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) {
auto it = DroppedEvictedBlobs.find(TEvictedBlob{.Blob = blobId});
if (it != DroppedEvictedBlobs.end()) {
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index 12611e835e..8bb09011f9 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -95,8 +95,9 @@ public:
virtual bool UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) = 0;
virtual bool EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) = 0;
virtual bool LoadOneToOneExport(IBlobManagerDb& db) = 0;
- //virtual TEvictedBlob GetEvicted(const TUnifiedBlobId& blob, TEvictMetadata& meta) = 0;
+ virtual TEvictedBlob GetEvicted(const TUnifiedBlobId& blob, TEvictMetadata& meta) = 0;
virtual TEvictedBlob GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) = 0;
+ virtual bool HasExternBlobs() const = 0;
};
// Garbage Collection generation and step
@@ -236,8 +237,13 @@ public:
bool UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) override;
bool EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) override;
bool LoadOneToOneExport(IBlobManagerDb& db) override;
+ TEvictedBlob GetEvicted(const TUnifiedBlobId& blobId, TEvictMetadata& meta) override;
TEvictedBlob GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) override;
+ bool HasExternBlobs() const override {
+ return EvictedBlobs.size() || DroppedEvictedBlobs.size();
+ }
+
// Implementation of IBlobInUseTracker
void SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) override;
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index 62486e3c33..de512d9e72 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -13,6 +13,10 @@
namespace NKikimr {
+namespace NColumnShard {
+class TBlobGroupSelector;
+}
+
struct TEvColumnShard {
enum EEv {
EvProposeTransaction = EventSpaceBegin(TKikimrEvents::ES_TX_COLUMNSHARD),
@@ -127,11 +131,39 @@ struct TEvColumnShard {
}
};
- // Read small blobs from the tablet
+ // Fallback read BlobCache read to tablet (small blobs or S3)
struct TEvReadBlobRanges : public TEventPB<TEvReadBlobRanges,
NKikimrTxColumnShard::TEvReadBlobRanges,
TEvColumnShard::EvReadBlobRanges>
{
+ std::vector<NOlap::TBlobRange> BlobRanges;
+
+ TEvReadBlobRanges() = default;
+
+ TEvReadBlobRanges(const std::vector<NOlap::TBlobRange>& blobRanges)
+ : BlobRanges(blobRanges)
+ {
+ for (const auto& r : BlobRanges) {
+ auto* range = Record.AddBlobRanges();
+ range->SetBlobId(r.BlobId.ToStringNew());
+ range->SetOffset(r.Offset);
+ range->SetSize(r.Size);
+ }
+ }
+
+ void RestoreFromProto(NColumnShard::TBlobGroupSelector* dsGroupSelector, TString& errString) {
+ BlobRanges.clear();
+ BlobRanges.reserve(Record.BlobRangesSize());
+
+ for (const auto& range : Record.GetBlobRanges()) {
+ auto blobId = NColumnShard::TUnifiedBlobId::ParseFromString(range.GetBlobId(), dsGroupSelector,
+ errString);
+ if (!errString.empty()) {
+ return;
+ }
+ BlobRanges.push_back(NOlap::TBlobRange{blobId, (ui32)range.GetOffset(), (ui32)range.GetSize()});
+ }
+ }
};
struct TEvReadBlobRangesResult : public TEventPB<TEvReadBlobRangesResult,
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 73723847c5..1db2290a4e 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -36,7 +36,7 @@ private:
};
-NOlap::TReadMetadata::TPtr
+std::shared_ptr<NOlap::TReadMetadata>
TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read,
const std::unique_ptr<NOlap::TInsertTable>& insertTable,
const std::unique_ptr<NOlap::IColumnEngine>& index,
@@ -250,13 +250,16 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read,
TIndexColumnResolver(Self->PrimaryIndex->GetIndexInfo()));
+ std::shared_ptr<NOlap::TReadMetadata> metadata;
if (parseResult) {
- ReadMetadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, ErrorDescription);
+ metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, ErrorDescription);
}
ui32 status = NKikimrTxColumnShard::EResultStatus::ERROR;
- if (ReadMetadata) {
+ if (metadata) {
+ Self->MapExternBlobs(ctx, *metadata);
+ ReadMetadata = metadata;
status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
}
diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
index b742a726a7..1dd732b3d1 100644
--- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
@@ -118,9 +118,75 @@ void TTxReadBlobRanges::Complete(const TActorContext& ctx) {
LOG_S_DEBUG("TTxReadBlobRanges.Complete at tablet " << Self->TabletID());
}
+static std::unique_ptr<TEvColumnShard::TEvReadBlobRangesResult>
+MakeErrorResponse(const TEvColumnShard::TEvReadBlobRanges& msg, ui64 tabletId, ui32 status) {
+ auto result = std::make_unique<TEvColumnShard::TEvReadBlobRangesResult>(tabletId);
+ for (const auto& range : msg.Record.GetBlobRanges()) {
+ auto* res = result->Record.AddResults();
+ res->MutableBlobRange()->CopyFrom(range);
+ res->SetStatus(status);
+ }
+ return result;
+}
+
void TColumnShard::Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TActorContext& ctx) {
- LOG_S_DEBUG("Read blob ranges at tablet " << TabletID() << ev->Get()->Record);
- Execute(new TTxReadBlobRanges(this, ev), ctx);
+ auto& msg = *ev->Get();
+
+ LOG_S_DEBUG("Read blob ranges at tablet " << TabletID() << msg.Record);
+
+ if (msg.BlobRanges.empty()) {
+ TBlobGroupSelector dsGroupSelector(Info());
+ TString errString;
+ msg.RestoreFromProto(&dsGroupSelector, errString);
+ Y_VERIFY_S(errString.empty(), errString);
+ }
+
+ std::optional<TUnifiedBlobId> evictedBlobId;
+ bool isSmall = false;
+ bool isFallback = false;
+ bool isOther = false;
+ for (const auto& range : msg.BlobRanges) {
+ auto& blobId = range.BlobId;
+ if (blobId.IsSmallBlob()) {
+ isSmall = true;
+ } else if (blobId.IsDsBlob()) {
+ isFallback = true;
+ if (evictedBlobId) {
+ // Can read only one blobId at a time (but multiple ranges from it)
+ Y_VERIFY(evictedBlobId == blobId);
+ } else {
+ evictedBlobId = blobId;
+ }
+ } else {
+ isOther = true;
+ }
+ }
+
+ Y_VERIFY(isSmall != isFallback && !isOther);
+
+ if (isSmall) {
+ Execute(new TTxReadBlobRanges(this, ev), ctx);
+ } else if (isFallback) {
+ Y_VERIFY(evictedBlobId->IsValid());
+
+ ui32 status = NKikimrProto::EReplyStatus::ERROR;
+
+ NKikimrTxColumnShard::TEvictMetadata meta;
+ auto evicted = BlobManager->GetEvicted(*evictedBlobId, meta);
+
+ if (!evicted.Blob.IsValid() || !evicted.ExternBlob.IsValid()) {
+ auto result = MakeErrorResponse(msg, TabletID(), status);
+ ctx.Send(ev->Sender, result.release(), 0, ev->Cookie);
+ }
+
+ TString tierName = meta.GetTierName();
+ Y_VERIFY(!tierName.empty());
+
+ if (!GetExportedBlob(ctx, ev->Sender, ev->Cookie, tierName, std::move(evicted), std::move(msg.BlobRanges))) {
+ auto result = MakeErrorResponse(msg, TabletID(), status);
+ ctx.Send(ev->Sender, result.release(), 0, ev->Cookie);
+ }
+ }
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 8dc0483b55..943360b370 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -31,7 +31,7 @@ public:
TTxType GetTxType() const override { return TXTYPE_START_SCAN; }
private:
- NOlap::TReadMetadataBase::TConstPtr CreateReadMetadata(const TActorContext& ctx, TReadDescription& read,
+ std::shared_ptr<NOlap::TReadMetadataBase> CreateReadMetadata(const TActorContext& ctx, TReadDescription& read,
bool isIndexStats, bool isReverse, ui64 limit);
private:
@@ -105,7 +105,10 @@ private:
if (!blobRange.BlobId.IsValid()) {
return false;
}
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange));
+
+ auto& externBlobs = ReadMetadataRanges[ReadMetadataIndex]->ExternBlobs;
+ bool fallback = externBlobs && externBlobs->count(blobRange.BlobId);
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, true, fallback));
++InFlightReads;
InFlightReadBytes += blobRange.Size;
return true;
@@ -509,7 +512,7 @@ static void FillPredicatesFromRange(TReadDescription& read, const ::NKikimrTx::T
}
}
-NOlap::TReadStatsMetadata::TPtr
+std::shared_ptr<NOlap::TReadStatsMetadata>
PrepareStatsReadMetadata(ui64 tabletId, const TReadDescription& read, const std::unique_ptr<NOlap::IColumnEngine>& index, TString& error) {
THashSet<ui32> readColumnIds(read.ColumnIds.begin(), read.ColumnIds.end());
for (auto& [id, name] : read.ProgramSourceColumns) {
@@ -567,10 +570,10 @@ PrepareStatsReadMetadata(ui64 tabletId, const TReadDescription& read, const std:
return out;
}
-NOlap::TReadMetadataBase::TConstPtr TTxScan::CreateReadMetadata(const TActorContext& ctx, TReadDescription& read,
+std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TActorContext& ctx, TReadDescription& read,
bool indexStats, bool isReverse, ui64 itemsLimit)
{
- NOlap::TReadMetadataBase::TPtr metadata;
+ std::shared_ptr<NOlap::TReadMetadataBase> metadata;
if (indexStats) {
metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->PrimaryIndex, ErrorDescription);
} else {
@@ -641,6 +644,9 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
if (!record.RangesSize()) {
auto range = CreateReadMetadata(ctx, read, isIndexStats, record.GetReverse(), itemsLimit);
if (range) {
+ if (!isIndexStats) {
+ Self->MapExternBlobs(ctx, static_cast<NOlap::TReadMetadata&>(*range));
+ }
ReadMetadataRanges = {range};
}
return true;
@@ -659,6 +665,9 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
ReadMetadataRanges.clear();
return true;
}
+ if (!isIndexStats) {
+ Self->MapExternBlobs(ctx, static_cast<NOlap::TReadMetadata&>(*newRange));
+ }
ReadMetadataRanges.emplace_back(newRange);
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 8e2d7f05ad..f0ad337377 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -698,13 +698,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
ActiveCompaction = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges,
Settings.CacheDataAfterCompaction);
- return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev));
+ return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager);
}
std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls,
bool force) {
if (ActiveTtl) {
- LOG_S_DEBUG("Ttl already in progress at tablet " << TabletID());
+ LOG_S_DEBUG("TTL already in progress at tablet " << TabletID());
return {};
}
if (!PrimaryIndex) {
@@ -740,7 +740,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
ActiveTtl = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, false);
- return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), needWrites);
+ return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites);
}
std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
@@ -789,7 +789,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return {};
}
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), changes , false);
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), changes, false);
ev->PutStatus = NKikimrProto::OK; // No new blobs to write
ActiveCleanup = true;
@@ -859,38 +859,79 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl
return indexInfo;
}
-void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
- THashMap<TUnifiedBlobId, TString>&& blobsIds) {
- if (!S3Actors.count(tierName)) {
- TString tier(tierName);
- LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID());
+void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMetadata& metadata) {
+ if (!metadata.SelectInfo) {
return;
}
- auto& s3 = S3Actors[tierName];
- if (!s3) {
- TString tier(tierName);
- LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID());
+
+ if (!BlobManager->HasExternBlobs()) {
return;
}
- auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsIds));
- ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release()));
+
+ THashSet<TUnifiedBlobId> uniqBlobs;
+ for (auto& portion : metadata.SelectInfo->Portions) {
+ for (auto& rec : portion.Records) {
+ uniqBlobs.insert(rec.BlobRange.BlobId);
+ }
+ }
+
+ THashMap<TUnifiedBlobId, TUnifiedBlobId> extMap;
+
+ for (auto& blobId : uniqBlobs) {
+ TEvictMetadata meta;
+ auto evicted = BlobManager->GetEvicted(blobId, meta);
+ if (evicted.ExternBlob.IsValid()) {
+ extMap[blobId] = evicted.ExternBlob;
+ }
+ }
+
+ if (!extMap.empty()) {
+ metadata.ExternBlobs = std::make_shared<const THashMap<TUnifiedBlobId, TUnifiedBlobId>>(std::move(extMap));
+ }
}
-void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName,
- std::vector<NOlap::TEvictedBlob>&& blobs) {
+TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& phase) {
if (!S3Actors.count(tierName)) {
- LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on forget) at tablet " << TabletID());
- return;
+ LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID());
+ return {};
}
- auto& s3 = S3Actors[tierName];
+ auto s3 = S3Actors[tierName];
if (!s3) {
- LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on forget) at tablet " << TabletID());
- return;
+ LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID());
+ return {};
+ }
+ return s3;
+}
+
+void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
+ THashMap<TUnifiedBlobId, TString>&& blobsIds) {
+ if (auto s3 = GetS3ActorForTier(tierName, "export")) {
+ auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsIds));
+ ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release()));
}
+}
+
+void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName,
+ std::vector<NOlap::TEvictedBlob>&& blobs) {
+ if (auto s3 = GetS3ActorForTier(tierName, "forget")) {
+ auto forget = std::make_unique<TEvPrivate::TEvForget>();
+ forget->Evicted = std::move(blobs);
+ ctx.Send(s3, forget.release());
+ }
+}
- auto forget = std::make_unique<TEvPrivate::TEvForget>();
- forget->Evicted = std::move(blobs);
- ctx.Send(s3, forget.release());
+bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
+ NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges) {
+ if (auto s3 = GetS3ActorForTier(tierName, "get exported")) {
+ auto get = std::make_unique<TEvPrivate::TEvGetExported>();
+ get->DstActor = dst;
+ get->DstCookie = cookie;
+ get->Evicted = std::move(evicted);
+ get->BlobRanges = std::move(ranges);
+ ctx.Send(s3, get.release());
+ return true;
+ }
+ return false;
}
ui32 TColumnShard::InitS3Actors(const TActorContext& ctx, bool init) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 6591cae2f9..943263d2a4 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -426,9 +426,13 @@ private:
void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions);
NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema);
+ void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata);
+ TActorId GetS3ActorForTier(const TString& tierName, const TString& phase);
void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
THashMap<TUnifiedBlobId, TString>&& blobsIds);
void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs);
+ bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
+ NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges);
ui32 InitS3Actors(const TActorContext& ctx, bool init);
void StopS3Actors(const TActorContext& ctx);
diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h
index 076efaa758..4f6a0e5372 100644
--- a/ydb/core/tx/columnshard/columnshard_txs.h
+++ b/ydb/core/tx/columnshard/columnshard_txs.h
@@ -23,6 +23,7 @@ struct TEvPrivate {
EvS3Settings,
EvExport,
EvForget,
+ EvGetExported,
EvEnd
};
@@ -60,21 +61,52 @@ struct TEvPrivate {
struct TEvCompaction : public TEventLocal<TEvCompaction, EvIndexing> {
std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent;
+ THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GroupedBlobRanges;
+ THashSet<TUnifiedBlobId> Externals;
- explicit TEvCompaction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent)
+ explicit TEvCompaction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent, IBlobExporter& blobManager)
: TxEvent(std::move(txEvent))
{
TxEvent->GranuleCompaction = true;
+ Y_VERIFY(TxEvent->IndexChanges);
+
+ GroupedBlobRanges = NOlap::TColumnEngineChanges::GroupedBlobRanges(TxEvent->IndexChanges->SwitchedPortions);
+
+ if (blobManager.HasExternBlobs()) {
+ for (auto& [blobId, _] : GroupedBlobRanges) {
+ TEvictMetadata meta;
+ if (blobManager.GetEvicted(blobId, meta).IsExternal()) {
+ Externals.insert(blobId);
+ }
+ }
+ }
}
};
struct TEvEviction : public TEventLocal<TEvEviction, EvEviction> {
std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent;
+ THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GroupedBlobRanges;
+ THashSet<TUnifiedBlobId> Externals;
- explicit TEvEviction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent, bool needWrites)
+ explicit TEvEviction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent, IBlobExporter& blobManager,
+ bool needWrites)
: TxEvent(std::move(txEvent))
{
- if (!needWrites) {
+ Y_VERIFY(TxEvent->IndexChanges);
+
+ if (needWrites) {
+ GroupedBlobRanges =
+ NOlap::TColumnEngineChanges::GroupedBlobRanges(TxEvent->IndexChanges->PortionsToEvict);
+
+ if (blobManager.HasExternBlobs()) {
+ for (auto& [blobId, _] : GroupedBlobRanges) {
+ TEvictMetadata meta;
+ if (blobManager.GetEvicted(blobId, meta).IsExternal()) {
+ Externals.insert(blobId);
+ }
+ }
+ }
+ } else {
TxEvent->PutStatus = NKikimrProto::OK;
}
}
@@ -132,6 +164,13 @@ struct TEvPrivate {
TString ErrorStr;
};
+ struct TEvGetExported : public TEventLocal<TEvGetExported, EvGetExported> {
+ TActorId DstActor; // It's a BlobCache actor. S3 actor sends TEvReadBlobRangesResult to it as result
+ ui64 DstCookie;
+ NOlap::TEvictedBlob Evicted;
+ std::vector<NOlap::TBlobRange> BlobRanges;
+ };
+
struct TEvScanStats : public TEventLocal<TEvScanStats, EvScanStats> {
TEvScanStats(ui64 rows, ui64 bytes) : Rows(rows), Bytes(bytes) {}
ui64 Rows;
@@ -209,7 +248,7 @@ protected:
: TBase(self)
{}
- NOlap::TReadMetadata::TPtr PrepareReadMetadata(
+ std::shared_ptr<NOlap::TReadMetadata> PrepareReadMetadata(
const TActorContext& ctx,
const TReadDescription& readDescription,
const std::unique_ptr<NOlap::TInsertTable>& insertTable,
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index 1e61e2b5f1..b64ed17140 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -18,7 +18,7 @@ public:
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
{}
- void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& /*ctx*/) {
auto& event = *ev->Get();
TxEvent = std::move(event.TxEvent);
Y_VERIFY(TxEvent);
@@ -30,23 +30,14 @@ public:
LOG_S_DEBUG("Granules compaction: " << *indexChanges << " at tablet " << TabletId);
- auto& switchedPortions = indexChanges->SwitchedPortions;
- Y_VERIFY(switchedPortions.size());
+ for (auto& [blobId, ranges] : event.GroupedBlobRanges) {
+ Y_VERIFY(!ranges.empty());
- for (auto& portionInfo : switchedPortions) {
- Y_VERIFY(!portionInfo.Empty());
- std::vector<NBlobCache::TBlobRange> ranges;
- for (auto& rec : portionInfo.Records) {
- auto& blobRange = rec.BlobRange;
+ for (const auto& blobRange : ranges) {
+ Y_VERIFY(blobId == blobRange.BlobId);
Blobs[blobRange] = {};
- // Group only ranges from the same blob into one request
- if (!ranges.empty() && ranges.back().BlobId != blobRange.BlobId) {
- SendReadRequest(ctx, std::move(ranges));
- ranges = {};
- }
- ranges.push_back(blobRange);
}
- SendReadRequest(ctx, std::move(ranges));
+ SendReadRequest(std::move(ranges), event.Externals.count(blobId));
}
}
@@ -110,11 +101,11 @@ private:
NumRead = 0;
}
- void SendReadRequest(const TActorContext&, std::vector<NBlobCache::TBlobRange>&& ranges) {
- if (ranges.empty())
- return;
+ void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) {
+ Y_VERIFY(!ranges.empty());
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false));
+ Send(BlobCacheActorId,
+ new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false, isExternal));
}
void CompactGranules(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 8fb6ac424a..c7d9b20a6f 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -161,6 +161,36 @@ public:
return size;
}
+ static THashMap<TUnifiedBlobId, std::vector<TBlobRange>>
+ GroupedBlobRanges(const TVector<TPortionInfo>& portions) {
+ Y_VERIFY(portions.size());
+
+ THashMap<TUnifiedBlobId, std::vector<TBlobRange>> sameBlobRanges;
+ for (auto& portionInfo : portions) {
+ Y_VERIFY(!portionInfo.Empty());
+
+ for (auto& rec : portionInfo.Records) {
+ sameBlobRanges[rec.BlobRange.BlobId].push_back(rec.BlobRange);
+ }
+ }
+ return sameBlobRanges;
+ }
+
+ static THashMap<TUnifiedBlobId, std::vector<TBlobRange>>
+ GroupedBlobRanges(const TVector<std::pair<TPortionInfo, TString>>& portions) {
+ Y_VERIFY(portions.size());
+
+ THashMap<TUnifiedBlobId, std::vector<TBlobRange>> sameBlobRanges;
+ for (auto& [portionInfo, _] : portions) {
+ Y_VERIFY(!portionInfo.Empty());
+
+ for (auto& rec : portionInfo.Records) {
+ sameBlobRanges[rec.BlobRange.BlobId].push_back(rec.BlobRange);
+ }
+ }
+ return sameBlobRanges;
+ }
+
friend IOutputStream& operator << (IOutputStream& out, const TColumnEngineChanges& changes) {
if (ui32 switched = changes.SwitchedPortions.size()) {
out << "switch " << switched << " portions";
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 938c141fbc..7509cc1dac 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -31,7 +31,6 @@ struct TReadStats {
// Holds all metedata that is needed to perform read/scan
struct TReadMetadataBase {
- using TPtr = std::shared_ptr<TReadMetadataBase>;
using TConstPtr = std::shared_ptr<const TReadMetadataBase>;
enum class ESorting {
@@ -48,6 +47,7 @@ struct TReadMetadataBase {
std::shared_ptr<arrow::Schema> LoadSchema; // ResultSchema + required for intermediate operations
std::shared_ptr<arrow::Schema> ResultSchema; // TODO: add Program modifications
std::vector<std::shared_ptr<NArrow::TProgramStep>> Program;
+ std::shared_ptr<const THashMap<TUnifiedBlobId, TUnifiedBlobId>> ExternBlobs; // DS -> S3 map TODO: move out of base
ESorting Sorting{ESorting::ASC}; // Sorting inside returned batches
ui64 Limit{0}; // TODO
@@ -74,7 +74,6 @@ struct TReadMetadataBase {
// Holds all metadata that is needed to perform read/scan
struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadMetadata> {
- using TPtr = std::shared_ptr<TReadMetadata>;
using TConstPtr = std::shared_ptr<const TReadMetadata>;
TIndexInfo IndexInfo;
@@ -154,7 +153,6 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_
};
struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadStatsMetadata> {
- using TPtr = std::shared_ptr<TReadStatsMetadata>;
using TConstPtr = std::shared_ptr<const TReadStatsMetadata>;
const ui64 TabletId;
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index da752e96d2..4cdef39b0b 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -18,7 +18,7 @@ public:
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
{}
- void Handle(TEvPrivate::TEvEviction::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvPrivate::TEvEviction::TPtr& ev, const TActorContext& /*ctx*/) {
auto& event = *ev->Get();
TxEvent = std::move(event.TxEvent);
Y_VERIFY(TxEvent);
@@ -29,23 +29,14 @@ public:
LOG_S_DEBUG("Portions eviction: " << *indexChanges << " at tablet " << TabletId);
- auto& evictedPortions = indexChanges->PortionsToEvict;
- Y_VERIFY(evictedPortions.size());
+ for (auto& [blobId, ranges] : event.GroupedBlobRanges) {
+ Y_VERIFY(!ranges.empty());
- for (auto& [portionInfo, tierName] : evictedPortions) {
- Y_VERIFY(!portionInfo.Empty());
- std::vector<NBlobCache::TBlobRange> ranges;
- for (auto& rec : portionInfo.Records) {
- auto& blobRange = rec.BlobRange;
+ for (const auto& blobRange : ranges) {
+ Y_VERIFY(blobId == blobRange.BlobId);
Blobs[blobRange] = {};
- // Group only ranges from the same blob into one request
- if (!ranges.empty() && ranges.back().BlobId != blobRange.BlobId) {
- SendReadRequest(ctx, std::move(ranges));
- ranges = {};
- }
- ranges.push_back(blobRange);
}
- SendReadRequest(ctx, std::move(ranges));
+ SendReadRequest(std::move(ranges), event.Externals.count(blobId));
}
}
@@ -109,12 +100,11 @@ private:
NumRead = 0;
}
- void SendReadRequest(const TActorContext&, std::vector<NBlobCache::TBlobRange>&& ranges) {
- if (ranges.empty()) {
- return;
- }
+ void SendReadRequest(std::vector<NBlobCache::TBlobRange>&& ranges, bool isExternal) {
+ Y_VERIFY(!ranges.empty());
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false));
+ Send(BlobCacheActorId,
+ new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false, isExternal));
}
void EvictPortions(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp
index 3963279ffc..4038f1cde3 100644
--- a/ydb/core/tx/columnshard/export_actor.cpp
+++ b/ydb/core/tx/columnshard/export_actor.cpp
@@ -88,7 +88,7 @@ private:
void SendReadRequest(const NBlobCache::TBlobRange& blobRange) {
Y_VERIFY(!blobRange.Offset);
Y_VERIFY(blobRange.Size);
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false));
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false, false));
}
void SendResultAndDie(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index 4117e9f57f..b745bfd4df 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -100,7 +100,7 @@ private:
void SendReadRequest(const NBlobCache::TBlobRange& blobRange) {
Y_VERIFY(blobRange.Size);
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false));
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false, false));
}
void Index(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index aafb2592d0..90bf28a3c4 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -187,7 +187,10 @@ public:
void SendReadRequest(const TActorContext& ctx, const NBlobCache::TBlobRange& blobRange) {
Y_UNUSED(ctx);
Y_VERIFY(blobRange.Size);
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange));
+
+ auto& externBlobs = ReadMetadata->ExternBlobs;
+ bool fallback = externBlobs && externBlobs->count(blobRange.BlobId);
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, true, fallback));
}
STFUNC(StateWait) {
diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/columnshard/s3_actor.cpp
index 4d91b2b4fb..bd71bafdc7 100644
--- a/ydb/core/tx/columnshard/s3_actor.cpp
+++ b/ydb/core/tx/columnshard/s3_actor.cpp
@@ -13,6 +13,10 @@ using NWrappers::TEvS3Wrapper;
namespace {
+TString ExtractBlobPart(const NOlap::TBlobRange& blobRange, const TString& data) {
+ return TString(&data[blobRange.Offset], blobRange.Size);
+}
+
struct TS3Export {
std::unique_ptr<TEvPrivate::TEvExport> Event;
THashSet<TString> KeysToWrite;
@@ -130,7 +134,7 @@ public:
auto& forget = Forgets[forgetNo];
for (auto& evict : forget.Event->Evicted) {
- if (!evict.ExternBlob.IsValid()) {
+ if (!evict.ExternBlob.IsS3Blob()) {
LOG_S_ERROR("[S3] Forget not exported '" << evict.Blob.ToStringNew() << "' at tablet " << TabletId);
continue;
}
@@ -144,6 +148,26 @@ public:
}
}
+ void Handle(TEvPrivate::TEvGetExported::TPtr& ev) {
+ auto& evict = ev->Get()->Evicted;
+ if (!evict.ExternBlob.IsS3Blob()) {
+ LOG_S_ERROR("[S3] Get not exported '" << evict.Blob.ToStringNew() << "' at tablet " << TabletId);
+ return;
+ }
+
+ TString key = evict.ExternBlob.GetS3Key();
+
+ bool reading = ReadingKeys.count(key);
+ ReadingKeys[key].emplace_back(ev->Release().Release());
+
+ if (!reading) {
+ ui64 blobSize = evict.ExternBlob.BlobSize();
+ SendGetObject(key, {0, blobSize});
+ } else {
+ LOG_S_DEBUG("[S3] Outstanding get key '" << key << "' at tablet " << TabletId);
+ }
+ }
+
// TODO: clean written blobs in failed export
void Handle(TEvS3Wrapper::TEvPutObjectResponse::TPtr& ev) {
Y_VERIFY(Initialized());
@@ -233,29 +257,6 @@ public:
Forgets.erase(forgetNo);
}
}
-#if 0
- void Handle(TEvS3Wrapper::TEvHeadObjectResponse::TPtr& ev) {
- Y_VERIFY(Initialized());
-
- auto& msg = *ev->Get();
- const auto& key = msg.Key;
- const auto& resultOutcome = msg.Result;
-
- TString errStr;
- if (!resultOutcome.IsSuccess()) {
- errStr = LogError("HeadObjectResponse", resultOutcome.GetError(), !!key);
- }
-
- if (!errStr.empty()) {
- //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}, errStr));
- }
-
- Y_VERIFY(key);
- ui64 contentLength = resultOutcome.GetResult().GetContentLength();
- LOG_S_DEBUG("HeadObjectResponse '" << *key << "', size: " << contentLength << " at tablet " << TabletId);
-
- //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}));
- }
void Handle(TEvS3Wrapper::TEvGetObjectResponse::TPtr& ev) {
Y_VERIFY(Initialized());
@@ -270,18 +271,49 @@ public:
errStr = LogError("GetObjectResponse", resultOutcome.GetError(), !!key);
}
- if (!errStr.empty()) {
- //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}, errStr));
+ if (!key || key->empty()) {
+ LOG_S_ERROR("[S3] no key in GetObjectResponse at tablet " << TabletId << ": " << errStr);
+ return; // nothing to do without key
+ }
+
+ if (!ReadingKeys.count(*key)) {
+ LOG_S_ERROR("[S3] no reading keys for key " << *key << " at tablet " << TabletId);
+ return; // nothing to do without events
}
// TODO: CheckETag
- Y_VERIFY(key);
LOG_S_DEBUG("GetObjectResponse '" << *key << "', size: " << data.size() << " at tablet " << TabletId);
- //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, data));
+ auto status = errStr.empty() ? NKikimrProto::OK : NKikimrProto::ERROR;
+
+ for (const auto& ev : ReadingKeys[*key]) {
+ auto result = std::make_unique<TEvColumnShard::TEvReadBlobRangesResult>(TabletId);
+
+ for (const auto& blobRange : ev->BlobRanges) {
+ if (data.size() < blobRange.Offset + blobRange.Size) {
+ LOG_S_ERROR("GetObjectResponse '" << *key << "', data size: " << data.size()
+ << " is too small for blob range {" << blobRange.Offset << "," << blobRange.Size << "}"
+ << " at tablet " << TabletId);
+ status = NKikimrProto::ERROR;
+ }
+
+ auto* res = result->Record.AddResults();
+ auto* resRange = res->MutableBlobRange();
+ resRange->SetBlobId(blobRange.BlobId.ToStringNew());
+ resRange->SetOffset(blobRange.Offset);
+ resRange->SetSize(blobRange.Size);
+ res->SetStatus(status);
+
+ if (status == NKikimrProto::OK) {
+ res->SetData(ExtractBlobPart(blobRange, data));
+ }
+ }
+
+ Send(ev->DstActor, result.release(), 0, ev->DstCookie);
+ }
+ ReadingKeys.erase(*key);
}
-#endif
private:
ui64 TabletId;
@@ -294,18 +326,20 @@ private:
THashMap<ui64, TS3Forget> Forgets;
THashMap<TString, ui64> ExportingKeys;
THashMap<TString, ui64> ForgettingKeys;
+ THashMap<TString, std::vector<std::unique_ptr<TEvPrivate::TEvGetExported>>> ReadingKeys;
STATEFN(StateWait) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPrivate::TEvS3Settings, Handle);
hFunc(TEvPrivate::TEvExport, Handle);
hFunc(TEvPrivate::TEvForget, Handle);
+ hFunc(TEvPrivate::TEvGetExported, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
hFunc(TEvS3Wrapper::TEvPutObjectResponse, Handle);
hFunc(TEvS3Wrapper::TEvDeleteObjectResponse, Handle);
+ hFunc(TEvS3Wrapper::TEvGetObjectResponse, Handle);
#if 0
hFunc(TEvS3Wrapper::TEvHeadObjectResponse, Handle);
- hFunc(TEvS3Wrapper::TEvGetObjectResponse, Handle);
#endif
default:
break;
@@ -347,11 +381,11 @@ private:
Send(S3Ctx.Client, new TEvS3Wrapper::TEvHeadObjectRequest(request));
}
- void SendGetObject(const TString& key) {
+ void SendGetObject(const TString& key, const std::pair<ui64, ui64>& range) {
auto request = Aws::S3::Model::GetObjectRequest()
.WithBucket(Bucket)
- .WithKey(key);
- //.WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); // TODO
+ .WithKey(key)
+ .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second);
LOG_S_DEBUG("[S3] GetObjectRequest key '" << key << "' at tablet " << TabletId);
Send(S3Ctx.Client, new TEvS3Wrapper::TEvGetObjectRequest(request));
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index cd37e85dc6..4f4462ef30 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -269,6 +269,8 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
TTestBasicRuntime runtime;
TTester::Setup(runtime);
+ //runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
+
TActorId sender = runtime.AllocateEdgeActor();
CreateTestBootstrapper(runtime,
CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::COLUMNSHARD),