diff options
| author | Artem Zuikov <[email protected]> | 2022-03-31 23:05:51 +0300 |
|---|---|---|
| committer | Artem Zuikov <[email protected]> | 2022-03-31 23:05:51 +0300 |
| commit | 62c1c476044dc1568ba47e2bb8e6e491fb13ed1d (patch) | |
| tree | 0144dd2d57d0384e7d76598df587cf67f34484d6 | |
| parent | d2fa7a39fd858307d4d5a6e2d548c4a6ee4f496f (diff) | |
KIKIMR-13595: delete blobs in S3
ref:676dc93d4a84554e5739222704938301fb725483
| -rw-r--r-- | ydb/core/tx/columnshard/blob.cpp | 64 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/blob.h | 7 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/blob_manager_db.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard__export.cpp | 62 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 51 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 34 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 4 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard_txs.h | 15 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/s3_actor.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/ut_columnshard_schema.cpp | 14 |
10 files changed, 166 insertions, 93 deletions
diff --git a/ydb/core/tx/columnshard/blob.cpp b/ydb/core/tx/columnshard/blob.cpp index 99bde3a526a..9dac0226e52 100644 --- a/ydb/core/tx/columnshard/blob.cpp +++ b/ydb/core/tx/columnshard/blob.cpp @@ -7,46 +7,49 @@ namespace NKikimr::NOlap { -TString DsToS3Key(const TString& s) { - Y_VERIFY(s.size() > 2); - TString res = s; - res[0] = 'S'; - res[1] = '3'; - for (size_t i = 2; i < res.size(); ++i) { +// Format: "S3-f(logoBlobId)-group" +// Example: "S3-72075186224038245_51_31595_2_0_11952_0-2181038103" +TString DsIdToS3Key(const TUnifiedBlobId& dsid) { + TString res = TString("S3") + dsid.GetLogoBlobId().ToString(); + res[2] = '-'; // replace '[' + res[res.size() - 1] = '-'; // replace ']' + + for (size_t i = 0; i < res.size(); ++i) { switch (res[i]) { - case '[': - res[i] = 'a'; - break; - case ']': - res[i] = 'b'; - break; case ':': res[i] = '_'; - break; } } + + res += ToString(dsid.GetDsGroup()); return res; } -TString S3ToDsKey(const TString& s) { - Y_VERIFY(s.size() > 2); - TString res = s; - res[0] = 'D'; - res[1] = 'S'; - for (size_t i = 2; i < res.size(); ++i) { - switch (res[i]) { - case 'a': - res[i] = '['; - break; - case 'b': - res[i] = ']'; - break; +TUnifiedBlobId S3KeyToDsId(const TString& s, TString& error) { + TVector<TString> keyBucket; + Split(s, "-", keyBucket); + + ui32 dsGroup; + if (keyBucket.size() != 3 || keyBucket[0] != "S3" || !TryFromString<ui32>(keyBucket[2], dsGroup)) { + error = TStringBuilder() << "Wrong S3 key '" << s << "'"; + return TUnifiedBlobId(); + } + + TString blobId = "[" + keyBucket[1] + "]"; + for (size_t i = 0; i < blobId.size(); ++i) { + switch (blobId[i]) { case '_': - res[i] = ':'; + blobId[i] = ':'; break; } } - return res; + + TLogoBlobID logoBlobId; + if (!TLogoBlobID::Parse(logoBlobId, blobId, error)) { + return TUnifiedBlobId(); + } + + return TUnifiedBlobId(dsGroup, logoBlobId); } namespace { @@ -126,13 +129,12 @@ TUnifiedBlobId ParseS3BlobId(const TString& s, TString& error) { TVector<TString> keyBucket; Split(s, "|", keyBucket); - if (s.size() < 2 || s[0] != 'S' || s[1] != '3' || - keyBucket.size() != 2) { + if (keyBucket.size() != 2) { error = TStringBuilder() << "Wrong S3 id '" << s << "'"; return TUnifiedBlobId(); } - TUnifiedBlobId dsBlobId = ParseExtendedDsBlobId(S3ToDsKey(keyBucket[0]), error); + TUnifiedBlobId dsBlobId = S3KeyToDsId(keyBucket[0], error); if (!dsBlobId.IsValid()) { return TUnifiedBlobId(); } diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h index 6b730a57a58..250b7db6eb7 100644 --- a/ydb/core/tx/columnshard/blob.h +++ b/ydb/core/tx/columnshard/blob.h @@ -7,9 +7,10 @@ namespace NKikimr::NOlap { class IBlobGroupSelector; +class TUnifiedBlobId; -TString DsToS3Key(const TString& s); -TString S3ToDsKey(const TString& s); +TString DsIdToS3Key(const TUnifiedBlobId& dsid); +TUnifiedBlobId S3KeyToDsId(const TString& s, TString& error); // Encapsulates different types of blob ids to simplify dealing with blobs for the // components that do not need to know where the blob is stored @@ -96,7 +97,7 @@ class TUnifiedBlobId { { Y_VERIFY(dsBlob.IsDsBlob()); DsBlobId = std::get<TDsBlobId>(dsBlob.Id); - Key = DsToS3Key(DsBlobId.ToStringNew()); + Key = DsIdToS3Key(dsBlob); } bool operator == (const TS3BlobId& other) const { diff --git a/ydb/core/tx/columnshard/blob_manager_db.cpp b/ydb/core/tx/columnshard/blob_manager_db.cpp index 52f62b86230..05802e328da 100644 --- a/ydb/core/tx/columnshard/blob_manager_db.cpp +++ b/ydb/core/tx/columnshard/blob_manager_db.cpp @@ -129,11 +129,13 @@ bool TBlobManagerDb::LoadEvicted(THashMap<TEvictedBlob, TString>& evicted, THash TString strExternId = rowset.GetValue<Schema::OneToOneEvictedBlobs::ExternBlobId>(); // TODO: CachedBlob + Y_VERIFY(state != ui8(EEvictState::UNKNOWN)); + TUnifiedBlobId blobId = TUnifiedBlobId::ParseFromString(strBlobId, &dsGroupSelector, error); Y_VERIFY(blobId.IsValid(), "%s", error.c_str()); TUnifiedBlobId externId = TUnifiedBlobId::ParseFromString(strExternId, nullptr, error); - Y_VERIFY(externId.IsValid(), "%s", error.c_str()); + Y_VERIFY((state == ui8(EEvictState::EVICTING) || externId.IsValid()), "%s", error.c_str()); TEvictedBlob evict{ .State = (EEvictState)state, diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp index 04e4381091e..d92e6e8b9a2 100644 --- a/ydb/core/tx/columnshard/columnshard__export.cpp +++ b/ydb/core/tx/columnshard/columnshard__export.cpp @@ -18,6 +18,7 @@ public: private: TEvPrivate::TEvExport::TPtr Ev; + std::vector<NOlap::TEvictedBlob> BlobsToForget; }; @@ -34,7 +35,8 @@ bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) { if (status == NKikimrProto::OK) { TBlobManagerDb blobManagerDb(txc.DB); - for (auto& [blobId, externId] : msg.SrcToDstBlobs) { + for (auto& [blob, externId] : msg.SrcToDstBlobs) { + auto& blobId = blob; Y_VERIFY(blobId.IsDsBlob()); Y_VERIFY(externId.IsS3Blob()); bool dropped = false; @@ -56,9 +58,26 @@ bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) { // Delayed erase of evicted blob. Blob could be already deleted. if (present && !dropped) { + LOG_S_DEBUG("Delete exported blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); Self->BlobManager->DeleteBlob(blobId, blobManagerDb); Self->IncCounter(COUNTER_BLOBS_ERASED); Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize()); + } else if (present) { + LOG_S_DEBUG("Stale exported blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); + + TEvictMetadata meta; + evict = Self->BlobManager->GetDropped(blobId, meta); + Y_VERIFY(evict.State == EEvictState::EXTERN); + + if (Self->DelayedForgetBlobs.count(blobId)) { + Self->DelayedForgetBlobs.erase(blobId); + BlobsToForget.emplace_back(std::move(evict)); + } else { + LOG_S_ERROR("No delayed forget for stale exported blob '" + << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); + } + } else { + LOG_S_ERROR("Exported but unknown blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); } // TODO: delete not present in S3 for sure (avoid race between export and forget) @@ -73,41 +92,36 @@ bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) { return true; } -void TTxExport::Complete(const TActorContext&) { +void TTxExport::Complete(const TActorContext& ctx) { + Y_VERIFY(Ev); LOG_S_DEBUG("TTxExport.Complete at tablet " << Self->TabletID()); + + auto& msg = *Ev->Get(); + Y_VERIFY(!msg.TierName.empty()); + + if (!BlobsToForget.empty()) { + Self->ForgetBlobs(ctx, msg.TierName, std::move(BlobsToForget)); + } } void TColumnShard::Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx) { auto status = ev->Get()->Status; + ui64 exportNo = ev->Get()->ExportNo; + auto& tierName = ev->Get()->TierName; bool error = status == NKikimrProto::ERROR; if (error) { - LOG_S_WARN("Export (fail): '" << ev->Get()->ErrorStr << "' at tablet " << TabletID()); + LOG_S_WARN("Export (fail): " << exportNo << " tier '" << tierName << "' error: " + << ev->Get()->ErrorStr << "' at tablet " << TabletID()); } else if (status == NKikimrProto::UNKNOWN) { - ui64 exportNo = ev->Get()->ExportNo; - auto& tierBlobs = ev->Get()->TierBlobs; - Y_VERIFY(tierBlobs.size()); + LOG_S_DEBUG("Export (write): " << exportNo << " tier '" << tierName << "' at tablet " << TabletID()); - LOG_S_DEBUG("Export (write): " << exportNo << " at tablet " << TabletID()); - - for (auto& [tierName, blobIds] : tierBlobs) { - if (!S3Actors.count(tierName)) { - TString tier(tierName); - LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID()); - continue; - } - auto& s3 = S3Actors[tierName]; - if (!s3) { - TString tier(tierName); - LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID()); - continue; - } - auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, s3, std::move(blobIds)); - ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release())); - } + auto& tierBlobs = ev->Get()->Blobs; + Y_VERIFY(tierBlobs.size()); + ExportBlobs(ctx, exportNo, tierName, std::move(tierBlobs)); } else if (status == NKikimrProto::OK) { - LOG_S_DEBUG("Export (apply) at tablet " << TabletID()); + LOG_S_DEBUG("Export (apply): " << exportNo << " tier '" << tierName << "' at tablet " << TabletID()); Execute(new TTxExport(this, ev), ctx); } else { diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 23ca51fe5fb..a02eb8673f1 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -25,6 +25,7 @@ public: private: TEvPrivate::TEvWriteIndex::TPtr Ev; THashMap<TUnifiedBlobId, TString> BlobsToExport; + THashMap<TString, THashMap<TUnifiedBlobId, TString>> ExportTierBlobs; THashMap<TString, std::vector<NOlap::TEvictedBlob>> TierBlobsToForget; ui64 ExportNo = 0; }; @@ -159,12 +160,23 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { TEvictMetadata meta; auto evict = Self->BlobManager->GetDropped(blobId, meta); Y_VERIFY(evict.State != EEvictState::UNKNOWN); + + bool exported = ui8(evict.State) == ui8(EEvictState::SELF_CACHED) || + ui8(evict.State) == ui8(EEvictState::EXTERN); + if (exported) { + LOG_S_DEBUG("Forget blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); + TierBlobsToForget[meta.GetTierName()].emplace_back(std::move(evict)); + } else { + LOG_S_DEBUG("Deleyed forget blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); + Self->DelayedForgetBlobs.insert(blobId); + } + bool deleted = ui8(evict.State) >= ui8(EEvictState::EXTERN); // !EVICTING and !SELF_CACHED - TierBlobsToForget[meta.GetTierName()].emplace_back(std::move(evict)); if (deleted) { continue; } } + LOG_S_DEBUG("Delete evicting blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); Self->BlobManager->DeleteBlob(blobId, blobManagerDb); Self->IncCounter(COUNTER_BLOBS_ERASED); Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize()); @@ -199,7 +211,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { } if (BlobsToExport.size()) { - ExportNo = ++Self->LastExportNo; + for (auto& [blobId, tierName] : BlobsToExport) { + ExportTierBlobs[tierName].emplace(blobId, TString{}); + } + BlobsToExport.clear(); + + ExportNo = Self->LastExportNo + 1; + Self->LastExportNo += ExportTierBlobs.size(); NIceDb::TNiceDb db(txc.DB); Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); @@ -252,31 +270,14 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { Self->EnqueueBackgroundActivities(); } - if (ExportNo) { - Y_VERIFY(BlobsToExport.size()); - THashMap<TString, THashMap<TUnifiedBlobId, TString>> tierBlobs; - for (auto& [blobId, tierName] : BlobsToExport) { - tierBlobs[tierName].emplace(blobId, TString()); - } - ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, std::move(tierBlobs))); + for (auto& [tierName, blobIds] : ExportTierBlobs) { + Y_VERIFY(ExportNo); + ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, tierName, std::move(blobIds))); + ++ExportNo; } - for (auto [tierName, blobs] : TierBlobsToForget) { - if (!Self->S3Actors.count(tierName)) { - TString tier(tierName); - LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on forget) at tablet " << Self->TabletID()); - continue; - } - auto& s3 = Self->S3Actors[tierName]; - if (!s3) { - TString tier(tierName); - LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on forget) at tablet " << Self->TabletID()); - continue; - } - - auto forget = std::make_unique<TEvPrivate::TEvForget>(); - forget->Evicted = std::move(blobs); - ctx.Send(s3, forget.release()); + for (auto& [tierName, blobs] : TierBlobsToForget) { + Self->ForgetBlobs(ctx, tierName, std::move(blobs)); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index abd00790d9a..8e2d7f05adf 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -859,6 +859,40 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl return indexInfo; } +void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, + THashMap<TUnifiedBlobId, TString>&& blobsIds) { + if (!S3Actors.count(tierName)) { + TString tier(tierName); + LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID()); + return; + } + auto& s3 = S3Actors[tierName]; + if (!s3) { + TString tier(tierName); + LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID()); + return; + } + auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsIds)); + ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release())); +} + +void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, + std::vector<NOlap::TEvictedBlob>&& blobs) { + if (!S3Actors.count(tierName)) { + LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on forget) at tablet " << TabletID()); + return; + } + auto& s3 = S3Actors[tierName]; + if (!s3) { + LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on forget) at tablet " << TabletID()); + return; + } + + auto forget = std::make_unique<TEvPrivate::TEvForget>(); + forget->Evicted = std::move(blobs); + ctx.Send(s3, forget.release()); +} + ui32 TColumnShard::InitS3Actors(const TActorContext& ctx, bool init) { ui32 count = 0; #ifndef KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index b0959555536..6591cae2f9d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -360,6 +360,7 @@ private: std::unique_ptr<NOlap::TInsertTable> InsertTable; std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; THashMap<TString, TTierConfig> TierConfigs; + THashSet<NOlap::TUnifiedBlobId> DelayedForgetBlobs; TTtl Ttl; THashMap<ui64, TBasicTxInfo> BasicTxInfo; @@ -425,6 +426,9 @@ private: void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions); NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); + void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, + THashMap<TUnifiedBlobId, TString>&& blobsIds); + void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs); ui32 InitS3Actors(const TActorContext& ctx, bool init); void StopS3Actors(const TActorContext& ctx); diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h index a3ecac75a39..076efaa758b 100644 --- a/ydb/core/tx/columnshard/columnshard_txs.h +++ b/ydb/core/tx/columnshard/columnshard_txs.h @@ -97,25 +97,32 @@ struct TEvPrivate { NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; ui64 ExportNo{}; + TString TierName; TActorId DstActor; - THashMap<TString, TBlobDataMap> TierBlobs; TBlobDataMap Blobs; THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs; TString ErrorStr; - explicit TEvExport(ui64 exportNo, THashMap<TString, TBlobDataMap>&& tierBlobs) + explicit TEvExport(ui64 exportNo, const TString& tierName, TBlobDataMap&& tierBlobs) : ExportNo(exportNo) - , TierBlobs(std::move(tierBlobs)) + , TierName(tierName) + , Blobs(std::move(tierBlobs)) { Y_VERIFY(ExportNo); + Y_VERIFY(!TierName.empty()); + Y_VERIFY(!Blobs.empty()); } - TEvExport(ui64 exportNo, TActorId dstActor, TBlobDataMap&& blobs) + TEvExport(ui64 exportNo, const TString& tierName, TActorId dstActor, TBlobDataMap&& blobs) : ExportNo(exportNo) + , TierName(tierName) , DstActor(dstActor) , Blobs(std::move(blobs)) { Y_VERIFY(ExportNo); + Y_VERIFY(!TierName.empty()); + Y_VERIFY(DstActor); + Y_VERIFY(!Blobs.empty()); } }; diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/columnshard/s3_actor.cpp index 8d61847214f..4d91b2b4fb5 100644 --- a/ydb/core/tx/columnshard/s3_actor.cpp +++ b/ydb/core/tx/columnshard/s3_actor.cpp @@ -131,8 +131,8 @@ public: for (auto& evict : forget.Event->Evicted) { if (!evict.ExternBlob.IsValid()) { - LOG_S_INFO("[S3] Forget not exported '" << evict.Blob.ToStringNew() << "' at tablet " << TabletId); - continue; // TODO + LOG_S_ERROR("[S3] Forget not exported '" << evict.Blob.ToStringNew() << "' at tablet " << TabletId); + continue; } TString key = evict.ExternBlob.GetS3Key(); diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index 4af7892db2f..cd37e85dc62 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -316,12 +316,16 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); + + if (reboots) { + RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); + } } TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); #if 0 if (i) { - sleep(1); // TODO: wait export + sleep(10); // TODO: wait export } #endif // Read @@ -355,6 +359,10 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe } else { resColumns.emplace_back(nullptr, 0); } + + if (reboots) { + RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); + } } return resColumns; @@ -598,12 +606,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { Y_UNIT_TEST(ColdTiers) { TestHotAndColdTiers(false); } -#if 0 + Y_UNIT_TEST(RebootColdTiers) { NColumnShard::gAllowLogBatchingDefaultValue = false; TestHotAndColdTiers(true); } -#endif + Y_UNIT_TEST(Drop) { TestDrop(false); } |
