summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <[email protected]>2023-02-03 14:46:46 +0300
committerchertus <[email protected]>2023-02-03 14:46:46 +0300
commit26b6e08dbc1f4fb74fe369e9fbca6e877893c926 (patch)
treed9b73c84c07519dd7348285795846b70771683c0
parent19d375b572297a224fabbfc454176d875353c8dc (diff)
fix S3 blobs deletion
-rw-r--r--ydb/core/tx/columnshard/blob.h9
-rw-r--r--ydb/core/tx/columnshard/columnshard__export.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp76
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp23
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h1
-rw-r--r--ydb/core/tx/tiering/s3_actor.cpp4
-rw-r--r--ydb/core/wrappers/s3_storage.h4
7 files changed, 77 insertions, 46 deletions
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h
index c0f0e723531..6c11c16787e 100644
--- a/ydb/core/tx/columnshard/blob.h
+++ b/ydb/core/tx/columnshard/blob.h
@@ -315,6 +315,15 @@ enum class EEvictState : ui8 {
ERASED = 5, // source, extern, cached blobs: ---
};
+inline bool IsExported(EEvictState state) {
+ return state == EEvictState::SELF_CACHED ||
+ state == EEvictState::EXTERN;
+}
+
+inline bool IsDeleted(EEvictState state) {
+ return ui8(state) >= ui8(EEvictState::EXTERN); // !EVICTING and !SELF_CACHED
+}
+
struct TEvictedBlob {
EEvictState State = EEvictState::UNKNOWN;
TUnifiedBlobId Blob;
diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp
index 9c52b288a43..cc7dce2d392 100644
--- a/ydb/core/tx/columnshard/columnshard__export.cpp
+++ b/ydb/core/tx/columnshard/columnshard__export.cpp
@@ -116,15 +116,15 @@ void TColumnShard::Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext&
auto& tierName = ev->Get()->TierName;
if (status == NKikimrProto::UNKNOWN) {
- LOG_S_DEBUG("Export (write): " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
+ LOG_S_DEBUG("Export (write): id " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
auto& tierBlobs = ev->Get()->Blobs;
Y_VERIFY(tierBlobs.size());
ExportBlobs(ctx, exportNo, tierName, std::move(tierBlobs));
} else if (status == NKikimrProto::OK) {
- LOG_S_DEBUG("Export (apply): " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
+ LOG_S_DEBUG("Export (apply): id " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
Execute(new TTxExportFinish(this, ev), ctx);
} else if (status == NKikimrProto::ERROR) {
- LOG_S_WARN("Export (fail): " << exportNo << " tier '" << tierName << "' error: "
+ LOG_S_WARN("Export (fail): id " << exportNo << " tier '" << tierName << "' error: "
<< ev->Get()->SerializeErrorsToString() << "' at tablet " << TabletID());
--ActiveEvictions;
} else {
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 84dfc8cf81f..37824884410 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -34,7 +34,7 @@ private:
TEvPrivate::TEvWriteIndex::TPtr Ev;
THashMap<TString, TPathIdBlobs> ExportTierBlobs;
- THashMap<TString, std::vector<NOlap::TEvictedBlob>> TierBlobsToForget;
+ THashSet<TUnifiedBlobId> BlobsToForget;
ui64 ExportNo = 0;
};
@@ -129,6 +129,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
}
}
+ THashSet<TUnifiedBlobId> protectedBlobs;
+
Self->IncCounter(COUNTER_EVICTION_PORTIONS_WRITTEN, changes->PortionsToEvict.size());
for (auto& [portionInfo, evictionFeatures] : changes->PortionsToEvict) {
auto& tierName = portionInfo.TierName;
@@ -142,30 +144,47 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
for (auto& rec : portionInfo.Records) {
auto& blobId = rec.BlobRange.BlobId;
if (!blobsToExport.count(blobId)) {
- blobsToExport.emplace(blobId, evictionFeatures);
-
NKikimrTxColumnShard::TEvictMetadata meta;
meta.SetTierName(tierName);
- Self->BlobManager->ExportOneToOne(blobId, meta, blobManagerDb);
+ if (Self->BlobManager->ExportOneToOne(blobId, meta, blobManagerDb)) {
+ blobsToExport.emplace(blobId, evictionFeatures);
+ } else {
+ // TODO: support S3 -> S3 eviction
+ LOG_S_ERROR("Prevent evict evicted blob '" << blobId.ToStringNew()
+ << "' at tablet " << Self->TabletID());
+ protectedBlobs.insert(blobId);
+ }
}
}
}
}
- const auto& portionsToDrop = changes->PortionsToDrop;
+ // Note: RAW_BYTES_ERASED and BYTES_ERASED counters are not in sync for evicted data
THashSet<TUnifiedBlobId> blobsToDrop;
- Self->IncCounter(COUNTER_PORTIONS_ERASED, portionsToDrop.size());
- for (const auto& portionInfo : portionsToDrop) {
- for (const auto& rec : portionInfo.Records) {
- blobsToDrop.insert(rec.BlobRange.BlobId);
+ for (const auto& rec : changes->EvictedRecords) {
+ const auto& blobId = rec.BlobRange.BlobId;
+ if (blobsToExport.count(blobId)) {
+ // Eviction to S3. TTxExportFinish will delete src blob when dst blob get EEvictState::EXTERN state.
+ } else if (!protectedBlobs.count(blobId)) {
+ // We could drop the blob immediately
+ if (!blobsToDrop.count(blobId)) {
+ LOG_S_TRACE("Delete evicted blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
+ blobsToDrop.insert(blobId);
+ }
+
}
- Self->IncCounter(COUNTER_RAW_BYTES_ERASED, portionInfo.RawBytesSum());
}
- // Note: RAW_BYTES_ERASED and BYTES_ERASED counters are not in sync for evicted data
- THashSet<TUnifiedBlobId> blobsToEvict;
- for (const auto& rec : changes->EvictedRecords) {
- blobsToEvict.insert(rec.BlobRange.BlobId);
+ Self->IncCounter(COUNTER_PORTIONS_ERASED, changes->PortionsToDrop.size());
+ for (const auto& portionInfo : changes->PortionsToDrop) {
+ for (const auto& rec : portionInfo.Records) {
+ const auto& blobId = rec.BlobRange.BlobId;
+ if (!blobsToDrop.count(blobId)) {
+ LOG_S_TRACE("Delete blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
+ blobsToDrop.insert(blobId);
+ }
+ }
+ Self->IncCounter(COUNTER_RAW_BYTES_ERASED, portionInfo.RawBytesSum());
}
for (const auto& blobId : blobsToDrop) {
@@ -174,32 +193,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
auto evict = Self->BlobManager->GetDropped(blobId, meta);
Y_VERIFY(evict.State != EEvictState::UNKNOWN);
- bool exported = ui8(evict.State) == ui8(EEvictState::SELF_CACHED) ||
- ui8(evict.State) == ui8(EEvictState::EXTERN);
- if (exported) {
- LOG_S_DEBUG("Forget blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
- TierBlobsToForget[meta.GetTierName()].emplace_back(std::move(evict));
- } else {
- LOG_S_DEBUG("Deleyed forget blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
- Self->DelayedForgetBlobs.insert(blobId);
- }
+ BlobsToForget.insert(blobId);
- bool deleted = ui8(evict.State) >= ui8(EEvictState::EXTERN); // !EVICTING and !SELF_CACHED
- if (deleted) {
+ if (NOlap::IsDeleted(evict.State)) {
+ LOG_S_DEBUG("Skip delete blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
continue;
}
}
- LOG_S_TRACE("Delete blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
- Self->BlobManager->DeleteBlob(blobId, blobManagerDb);
- Self->IncCounter(COUNTER_BLOBS_ERASED);
- Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize());
- }
- for (const auto& blobId : blobsToEvict) {
- if (blobsToExport.count(blobId)) {
- // DS to S3 eviction. Keep source blob in DS till EEvictState::EXTERN state.
- continue;
- }
- LOG_S_TRACE("Delete evicted blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
Self->BlobManager->DeleteBlob(blobId, blobManagerDb);
Self->IncCounter(COUNTER_BLOBS_ERASED);
Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize());
@@ -311,9 +311,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
++ExportNo;
}
- for (auto& [tierName, blobs] : TierBlobsToForget) {
- Self->ForgetBlobs(ctx, tierName, std::move(blobs));
- }
+ Self->ForgetBlobs(ctx, BlobsToForget);
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index f5a70716e53..28152722828 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -1055,6 +1055,29 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName
}
}
+void TColumnShard::ForgetBlobs(const TActorContext& ctx, const THashSet<TUnifiedBlobId>& blobs) {
+ THashMap<TString, std::vector<NOlap::TEvictedBlob>> tierBlobs;
+
+ for (const auto& blobId : blobs) {
+ TEvictMetadata meta;
+ auto evict = BlobManager->GetDropped(blobId, meta);
+
+ if (evict.State == EEvictState::UNKNOWN) {
+ LOG_S_ERROR("Forget unknown blob '" << blobId.ToStringNew() << "' at tablet " << TabletID());
+ } else if (NOlap::IsExported(evict.State)) {
+ LOG_S_DEBUG("Forget blob '" << blobId.ToStringNew() << "' at tablet " << TabletID());
+ tierBlobs[meta.GetTierName()].emplace_back(std::move(evict));
+ } else {
+ LOG_S_DEBUG("Forget blob (deleyed) '" << blobId.ToStringNew() << "' at tablet " << TabletID());
+ DelayedForgetBlobs.insert(blobId);
+ }
+ }
+
+ for (auto& [tierName, blobs] : tierBlobs) {
+ ForgetBlobs(ctx, tierName, std::move(blobs));
+ }
+}
+
bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges) {
if (auto s3 = GetS3ActorForTier(tierName)) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index cdf4b7346ae..930579664c6 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -455,6 +455,7 @@ private:
void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const;
void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const;
+ void ForgetBlobs(const TActorContext& ctx, const THashSet<TUnifiedBlobId>& blobs);
bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges);
diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp
index 80277426652..566931e2a44 100644
--- a/ydb/core/tx/tiering/s3_actor.cpp
+++ b/ydb/core/tx/tiering/s3_actor.cpp
@@ -418,8 +418,8 @@ private:
void SendPutObject(const TString& key, TString&& data) const {
auto request = Aws::S3::Model::PutObjectRequest()
- .WithKey(key)
- .WithStorageClass(Aws::S3::Model::StorageClass::STANDARD_IA);
+ .WithKey(key);
+ //.WithStorageClass(Aws::S3::Model::StorageClass::STANDARD_IA); // TODO: move to config
#if 0
Aws::Map<Aws::String, Aws::String> metadata;
metadata.emplace("Content-Type", "application/x-compressed");
diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h
index c5f239b1a42..4317570bb97 100644
--- a/ydb/core/wrappers/s3_storage.h
+++ b/ydb/core/wrappers/s3_storage.h
@@ -53,13 +53,13 @@ private:
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) {
const auto* ctx = static_cast<const TCtx*>(context.get());
- LOG_NOTICE_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response"
+ LOG_INFO_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response"
<< ": uuid# " << ctx->GetUUID()
<< ", response# " << outcome);
ctx->Reply(request, outcome);
};
- LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::S3_WRAPPER, "Request"
+ LOG_INFO_S(*TlsActivationContext, NKikimrServices::S3_WRAPPER, "Request"
<< ": uuid# " << ctx->GetUUID()
<< ", request# " << ev->Get()->GetRequest());
func(Client.Get(), ctx->PrepareRequest(ev), callback, ctx);