diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-06 12:58:27 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-06 12:58:27 +0300 |
commit | e9326a3327ea548d69b98df2718079a2617d0315 (patch) | |
tree | 299801801810746c7dd0ac59a595d44c9319e04f | |
parent | 2a7533a248335565c1573eda193110fdf25781b3 (diff) | |
download | ydb-e9326a3327ea548d69b98df2718079a2617d0315.tar.gz |
prevent evicting duplication and ttl conflict
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.cpp | 28 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.h | 24 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__export.cpp | 33 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_txs.h | 20 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/export_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/read_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/s3_actor.cpp | 6 |
10 files changed, 95 insertions, 64 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index db29182c8c0..5d67ee6e56b 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -693,4 +693,32 @@ void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) { NBlobCache::ForgetBlob(blobId); } +bool TBlobManager::IsEvicting(const TUnifiedBlobId& id) { + TEvictMetadata meta; + return GetEvicted(id, meta).State == EEvictState::EVICTING; +} + +bool TBlobManager::ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped /*= false*/) { + if (fromDropped) { + if (DroppedEvictedBlobs.count(evict)) { + auto node = DroppedEvictedBlobs.extract(evict); + if (!node.empty()) { + evict = node.key(); + meta = node.mapped(); + return true; + } + } + } else { + if (EvictedBlobs.count(evict)) { + auto node = EvictedBlobs.extract(evict); + if (!node.empty()) { + evict = node.key(); + meta = node.mapped(); + return true; + } + } + } + return false; +} + } diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index 0d30720126b..f1bd7af5b3e 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -226,6 +226,7 @@ public: CountersUpdate = TBlobManagerCounters(); return res; } + bool IsEvicting(const TUnifiedBlobId& id); // Implementation of IBlobManager interface TBlobBatch StartBlobBatch(ui32 channel = BLOB_CHANNEL) override; @@ -255,28 +256,7 @@ private: // Delete small blobs that were previously in use and could not be deleted void PerformDelayedDeletes(IBlobManagerDb& db); - bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false) { - if (fromDropped) { - if (DroppedEvictedBlobs.count(evict)) { - auto node = DroppedEvictedBlobs.extract(evict); - if (!node.empty()) { - evict = node.key(); - meta = node.mapped(); - return true; - } - } - } else { - if (EvictedBlobs.count(evict)) { - auto node = EvictedBlobs.extract(evict); - if (!node.empty()) { - evict = node.key(); - meta = node.mapped(); - return true; - } - } - } - return false; - } + bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false); }; } diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp index d92e6e8b9a2..2caf42183a8 100644 --- a/ydb/core/tx/columnshard/columnshard__export.cpp +++ b/ydb/core/tx/columnshard/columnshard__export.cpp @@ -1,16 +1,17 @@ #include "columnshard_impl.h" #include "blob_manager_db.h" +#include "columnshard_schema.h" namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; -class TTxExport : public TTransactionBase<TColumnShard> { +class TTxExportFinish: public TTransactionBase<TColumnShard> { public: - TTxExport(TColumnShard* self, TEvPrivate::TEvExport::TPtr& ev) + TTxExportFinish(TColumnShard* self, TEvPrivate::TEvExport::TPtr& ev) : TBase(self) - , Ev(ev) - {} + , Ev(ev) { + } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; void Complete(const TActorContext& ctx) override; @@ -22,9 +23,9 @@ private: }; -bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) { +bool TTxExportFinish::Execute(TTransactionContext& txc, const TActorContext&) { Y_VERIFY(Ev); - LOG_S_DEBUG("TTxExport.Execute at tablet " << Self->TabletID()); + LOG_S_DEBUG("TTxExportFinish.Execute at tablet " << Self->TabletID()); txc.DB.NoMoreReadsForTx(); //NIceDb::TNiceDb db(txc.DB); @@ -92,13 +93,13 @@ bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) { return true; } -void TTxExport::Complete(const TActorContext& ctx) { +void TTxExportFinish::Complete(const TActorContext& ctx) { Y_VERIFY(Ev); - LOG_S_DEBUG("TTxExport.Complete at tablet " << Self->TabletID()); + LOG_S_DEBUG("TTxExportFinish.Complete at tablet " << Self->TabletID()); auto& msg = *Ev->Get(); Y_VERIFY(!msg.TierName.empty()); - + Self->ActiveEviction = false; if (!BlobsToForget.empty()) { Self->ForgetBlobs(ctx, msg.TierName, std::move(BlobsToForget)); } @@ -107,26 +108,28 @@ void TTxExport::Complete(const TActorContext& ctx) { void TColumnShard::Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx) { auto status = ev->Get()->Status; + + Y_VERIFY(!ActiveTtl, "TTL already in progress at tablet %lu", TabletID()); + Y_VERIFY(!ActiveEviction || status != NKikimrProto::UNKNOWN, "Eviction in progress at tablet %lu", TabletID()); ui64 exportNo = ev->Get()->ExportNo; auto& tierName = ev->Get()->TierName; - bool error = status == NKikimrProto::ERROR; - if (error) { + if (status == NKikimrProto::ERROR) { LOG_S_WARN("Export (fail): " << exportNo << " tier '" << tierName << "' error: " - << ev->Get()->ErrorStr << "' at tablet " << TabletID()); + << ev->Get()->SerializeErrorsToString() << "' at tablet " << TabletID()); + ActiveEviction = false; } else if (status == NKikimrProto::UNKNOWN) { LOG_S_DEBUG("Export (write): " << exportNo << " tier '" << tierName << "' at tablet " << TabletID()); - auto& tierBlobs = ev->Get()->Blobs; Y_VERIFY(tierBlobs.size()); ExportBlobs(ctx, exportNo, tierName, std::move(tierBlobs)); } else if (status == NKikimrProto::OK) { LOG_S_DEBUG("Export (apply): " << exportNo << " tier '" << tierName << "' at tablet " << TabletID()); - - Execute(new TTxExport(this, ev), ctx); + Execute(new TTxExportFinish(this, ev), ctx); } else { Y_VERIFY(false); } + ActiveEviction = true; } } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 8f35ec52a4a..e0233685047 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -25,7 +25,7 @@ public: private: TEvPrivate::TEvWriteIndex::TPtr Ev; THashMap<TUnifiedBlobId, TString> BlobsToExport; - THashMap<TString, THashMap<TUnifiedBlobId, TString>> ExportTierBlobs; + THashMap<TString, THashSet<TUnifiedBlobId>> ExportTierBlobs; THashMap<TString, std::vector<NOlap::TEvictedBlob>> TierBlobsToForget; ui64 ExportNo = 0; }; @@ -217,7 +217,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) if (BlobsToExport.size()) { size_t numBlobs = BlobsToExport.size(); for (auto& [blobId, tierName] : BlobsToExport) { - ExportTierBlobs[tierName].emplace(blobId, TString{}); + ExportTierBlobs[tierName].emplace(blobId); } BlobsToExport.clear(); @@ -280,7 +280,15 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { for (auto& [tierName, blobIds] : ExportTierBlobs) { Y_VERIFY(ExportNo); - ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, tierName, std::move(blobIds))); + + TEvPrivate::TEvExport::TBlobDataMap blobsData; + for (auto&& i : blobIds) { + TEvPrivate::TEvExport::TExportBlobInfo info; + info.Evicting = Self->BlobManager->IsEvicting(i); + blobsData.emplace(i, std::move(info)); + } + + ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, tierName, std::move(blobsData))); ++ExportNo; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 99b9ee07710..d469f0bc599 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -861,12 +861,13 @@ void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMeta } } -TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& phase) { - if (!S3Actors.count(tierName)) { +TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& phase) const { + auto it = S3Actors.find(tierName); + if (it == S3Actors.end()) { LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID()); return {}; } - auto s3 = S3Actors[tierName]; + auto s3 = it->second; if (!s3) { LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID()); return {}; @@ -875,15 +876,14 @@ TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& } void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, - THashMap<TUnifiedBlobId, TString>&& blobsIds) { + TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const { if (auto s3 = GetS3ActorForTier(tierName, "export")) { - auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsIds)); + auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsInfo)); ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release())); } } -void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, - std::vector<NOlap::TEvictedBlob>&& blobs) { +void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const { if (auto s3 = GetS3ActorForTier(tierName, "forget")) { auto forget = std::make_unique<TEvPrivate::TEvForget>(); forget->Evicted = std::move(blobs); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 5ec77f71a9c..b7e49660e9c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -34,7 +34,7 @@ IActor* CreateReadActor(ui64 tabletId, const TActorId& columnShardActorId, ui64 requestCookie); IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); -IActor* CreateExportActor(ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev); +IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev); #ifndef KIKIMR_DISABLE_S3_OPS IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName); #endif @@ -85,7 +85,7 @@ class TColumnShard friend class TTxRead; friend class TTxScan; friend class TTxWriteIndex; - friend class TTxExport; + friend class TTxExportFinish; friend class TTxForget; friend class TTxRunGC; friend class TTxProcessGCResult; @@ -382,6 +382,7 @@ private: bool ActiveIndexingOrCompaction = false; bool ActiveCleanup = false; bool ActiveTtl = false; + bool ActiveEviction = false; std::unique_ptr<TBlobManager> BlobManager; TInFlightReadsTracker InFlightReadsTracker; TSettings Settings; @@ -442,10 +443,10 @@ private: NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata); - TActorId GetS3ActorForTier(const TString& tierName, const TString& phase); + TActorId GetS3ActorForTier(const TString& tierName, const TString& phase) const; void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, - THashMap<TUnifiedBlobId, TString>&& blobsIds); - void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs); + TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const; + void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const; bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName, NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges); ui32 InitS3Actors(const TActorContext& ctx, bool init); diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h index 4f6a0e5372b..4616ae298ab 100644 --- a/ydb/core/tx/columnshard/columnshard_txs.h +++ b/ydb/core/tx/columnshard/columnshard_txs.h @@ -125,15 +125,19 @@ struct TEvPrivate { }; struct TEvExport : public TEventLocal<TEvExport, EvExport> { - using TBlobDataMap = THashMap<TUnifiedBlobId, TString>; + struct TExportBlobInfo { + TString Data; + bool Evicting = false; + }; + using TBlobDataMap = THashMap<TUnifiedBlobId, TExportBlobInfo>; NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; - ui64 ExportNo{}; + ui64 ExportNo = 0; TString TierName; TActorId DstActor; TBlobDataMap Blobs; THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs; - TString ErrorStr; + TMap<TString, TString> ErrorStrings; explicit TEvExport(ui64 exportNo, const TString& tierName, TBlobDataMap&& tierBlobs) : ExportNo(exportNo) @@ -156,9 +160,17 @@ struct TEvPrivate { Y_VERIFY(DstActor); Y_VERIFY(!Blobs.empty()); } + + TString SerializeErrorsToString() const { + TStringBuilder sb; + for (auto&& i : ErrorStrings) { + sb << i.first << "=" << i.second << ";"; + } + return sb; + } }; - struct TEvForget : public TEventLocal<TEvForget, EvForget> { + struct TEvForget: public TEventLocal<TEvForget, EvForget> { NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; std::vector<NOlap::TEvictedBlob> Evicted; TString ErrorStr; diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp index 4038f1cde3a..bba0020fe3b 100644 --- a/ydb/core/tx/columnshard/export_actor.cpp +++ b/ydb/core/tx/columnshard/export_actor.cpp @@ -22,8 +22,7 @@ public: } void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) { - LOG_S_DEBUG("TEvReadBlobRangeResult (waiting " << BlobsToRead.size() - << ") at tablet " << TabletId << " (export)"); + LOG_S_DEBUG("TEvReadBlobRangeResult (waiting " << BlobsToRead.size() << ") at tablet " << TabletId << " (export)"); auto& event = *ev->Get(); const TUnifiedBlobId& blobId = event.BlobRange.BlobId; @@ -49,7 +48,7 @@ public: BlobsToRead.erase(blobId); Y_VERIFY(Event); - Event->Blobs[blobId] = blobData; + Event->Blobs[blobId].Data = blobData; if (BlobsToRead.empty()) { SendResultAndDie(ctx); @@ -99,7 +98,7 @@ private: } }; -IActor* CreateExportActor(ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev) { +IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev) { return new TExportActor(tabletId, dstActor, ev); } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 2fff0b14416..ab49a66f592 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -35,7 +35,7 @@ public: auto& event = *ev->Get(); const TUnifiedBlobId& blobId = event.BlobRange.BlobId; - Y_VERIFY(event.Data.size() == event.BlobRange.Size); + Y_VERIFY(event.Data.size() == event.BlobRange.Size, "%zu, %d", event.Data.size(), event.BlobRange.Size); if (IndexedBlobs.count(event.BlobRange)) { if (!WaitIndexed.count(event.BlobRange)) { diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/columnshard/s3_actor.cpp index bd71bafdc76..29c2119ffd1 100644 --- a/ydb/core/tx/columnshard/s3_actor.cpp +++ b/ydb/core/tx/columnshard/s3_actor.cpp @@ -27,7 +27,7 @@ struct TS3Export { : Event(ev.Release()) {} - THashMap<TUnifiedBlobId, TString>& Blobs() { + TEvPrivate::TEvExport::TBlobDataMap& Blobs() { return Event->Blobs; } @@ -123,7 +123,7 @@ public: ex.KeysToWrite.emplace(key); ExportingKeys[key] = exportNo; - SendPutObject(key, std::move(blob)); + SendPutObject(key, std::move(blob.Data)); } } @@ -204,7 +204,7 @@ public: if (!errStr.empty()) { ex.Event->Status = NKikimrProto::ERROR; - ex.Event->ErrorStr = errStr; + ex.Event->ErrorStrings.emplace(key, errStr); Send(ShardActor, ex.Event.release()); Exports.erase(exportNo); } else if (ex.KeysToWrite.empty()) { |