diff options
author | chertus <azuikov@ydb.tech> | 2023-02-20 20:39:15 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-02-20 20:39:15 +0300 |
commit | 95faf209707e9cb22e5109c9bf35611fe7c1d449 (patch) | |
tree | b5f8d56b6939423dd29ad8b519473b81d720c98d | |
parent | f7ff62905473e34a95664927b43cd3b483f01986 (diff) | |
download | ydb-95faf209707e9cb22e5109c9bf35611fe7c1d449.tar.gz |
apply partial export at ColumnShard
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__export.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_private_events.h | 19 | ||||
-rw-r--r-- | ydb/core/tx/tiering/s3_actor.cpp | 47 |
3 files changed, 61 insertions, 28 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp index f2d7b8eb0a..92ee25ad99 100644 --- a/ydb/core/tx/columnshard/columnshard__export.cpp +++ b/ydb/core/tx/columnshard/columnshard__export.cpp @@ -33,7 +33,7 @@ bool TTxExportFinish::Execute(TTransactionContext& txc, const TActorContext&) { auto& msg = *Ev->Get(); auto status = msg.Status; - if (status == NKikimrProto::OK) { + { TBlobManagerDb blobManagerDb(txc.DB); for (auto& [blob, externId] : msg.SrcToDstBlobs) { @@ -42,6 +42,11 @@ bool TTxExportFinish::Execute(TTransactionContext& txc, const TActorContext&) { Y_VERIFY(externId.IsS3Blob()); bool dropped = false; + if (!msg.Blobs.count(blobId)) { + Y_VERIFY(!msg.ErrorStrings.empty()); + continue; // not exported + } + #if 0 // TODO: SELF_CACHED logic NOlap::TEvictedBlob evict{ .State = EEvictState::SELF_CACHED, @@ -78,7 +83,9 @@ bool TTxExportFinish::Execute(TTransactionContext& txc, const TActorContext&) { // TODO: delete not present in S3 for sure (avoid race between export and forget) #endif } + } + if (status == NKikimrProto::OK) { Self->IncCounter(COUNTER_EXPORT_SUCCESS); } else { Self->IncCounter(COUNTER_EXPORT_FAIL); @@ -112,15 +119,19 @@ void TColumnShard::Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& if (status == NKikimrProto::UNKNOWN) { LOG_S_DEBUG("Export (write): id " << exportNo << " tier '" << tierName << "' at tablet " << TabletID()); ExportBlobs(ctx, exportNo, tierName, pathId, std::move(msg.Blobs)); - } else if (status == NKikimrProto::OK) { - LOG_S_DEBUG("Export (apply): id " << exportNo << " tier '" << tierName << "' at tablet " << TabletID()); - Execute(new TTxExportFinish(this, ev), ctx); - } else if (status == NKikimrProto::ERROR) { + } else if (status == NKikimrProto::ERROR && msg.Blobs.empty()) { LOG_S_WARN("Export (fail): id " << exportNo << " tier '" << tierName << "' error: " << ev->Get()->SerializeErrorsToString() << "' at tablet " << TabletID()); --ActiveEvictions; } else { - Y_VERIFY(false); + // There's no atomicity needed here. Allow partial export + if (status == NKikimrProto::ERROR) { + LOG_S_WARN("Export (partial): id " << exportNo << " tier '" << tierName << "' error: " + << ev->Get()->SerializeErrorsToString() << "' at tablet " << TabletID()); + } else { + LOG_S_DEBUG("Export (apply): id " << exportNo << " tier '" << tierName << "' at tablet " << TabletID()); + } + Execute(new TTxExportFinish(this, ev), ctx); } } diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index b00f8e0441..b614ca9d7a 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -131,7 +131,7 @@ struct TEvPrivate { TString TierName; ui64 PathId = 0; TActorId DstActor; - TBlobDataMap Blobs; + TBlobDataMap Blobs; // src: blobId -> data map; dst: exported blobIds set THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs; TMap<TString, TString> ErrorStrings; @@ -161,6 +161,23 @@ struct TEvPrivate { Y_VERIFY(!Blobs.empty()); } + void AddResult(const TUnifiedBlobId& blobId, const TString& key, const bool hasError, const TString& errStr) { + if (hasError) { + Status = NKikimrProto::ERROR; + Y_VERIFY(ErrorStrings.emplace(key, errStr).second, "%s", key.data()); + Blobs.erase(blobId); + } else if (!ErrorStrings.count(key)) { // (OK + !OK) == !OK + Y_VERIFY(Blobs.count(blobId)); + if (Status == NKikimrProto::UNKNOWN) { + Status = NKikimrProto::OK; + } + } + } + + bool Finished() const { + return (Blobs.size() + ErrorStrings.size()) == SrcToDstBlobs.size(); + } + TString SerializeErrorsToString() const { TStringBuilder sb; for (auto&& i : ErrorStrings) { diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp index 5caaa56f45..7ab3c7a93d 100644 --- a/ydb/core/tx/tiering/s3_actor.cpp +++ b/ydb/core/tx/tiering/s3_actor.cpp @@ -44,17 +44,16 @@ public: return KeysToWrite.empty(); } - TS3Export& RegisterKey(const TString& key) { - KeysToWrite.emplace(key); - return *this; + void RegisterKey(const TString& key, const TUnifiedBlobId& blobId) { + KeysToWrite.emplace(key, blobId); } - TS3Export& FinishKey(const TString& key) { - KeysToWrite.erase(key); - return *this; + TUnifiedBlobId FinishKey(const TString& key) { + auto node = KeysToWrite.extract(key); + return node.mapped(); } private: - TSet<TString> KeysToWrite; + std::unordered_map<TString, TUnifiedBlobId> KeysToWrite; }; struct TS3Forget { @@ -118,15 +117,19 @@ public: ui64 exportNo = msg.ExportNo; Y_VERIFY(msg.DstActor == ShardActor); - Y_VERIFY(!Exports.count(exportNo)); + if (Exports.count(exportNo)) { + LOG_S_ERROR("[S3] Multiple exports with same export id '" << exportNo << "' at tablet " << TabletId); + return; + } + Exports[exportNo] = TS3Export(ev->Release()); auto& ex = Exports[exportNo]; for (auto& [blobId, blobData] : ex.Blobs()) { TString key = ex.AddExported(blobId, msg.PathId).GetS3Key(); - Y_VERIFY(!ExportingKeys.count(key)); // TODO + Y_VERIFY(!ExportingKeys.count(key)); // TODO: allow reexport? - ex.RegisterKey(key); + ex.RegisterKey(key, blobId); ExportingKeys[key] = exportNo; SendPutObjectIfNotExists(key, std::move(blobData)); @@ -185,7 +188,6 @@ public: } } - // TODO: clean written blobs in failed export void Handle(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { Y_VERIFY(Initialized()); @@ -198,7 +200,11 @@ public: errStr = LogError("PutObjectResponse", resultOutcome.GetError(), msg.Key); } - Y_VERIFY(msg.Key); // FIXME + if (!msg.Key || msg.Key->empty()) { + LOG_S_ERROR("[S3] no key in PutObjectResponse at tablet " << TabletId); + return; + } + const TString key = *msg.Key; LOG_S_DEBUG("[S3] PutObjectResponse '" << key << "' at tablet " << TabletId); @@ -254,7 +260,11 @@ public: errStr = LogError("DeleteObjectResponse", resultOutcome.GetError(), msg.Key); } - Y_VERIFY(msg.Key); // FIXME + if (!msg.Key || msg.Key->empty()) { + LOG_S_ERROR("[S3] no key in DeleteObjectResponse at tablet " << TabletId); + return; + } + TString key = *msg.Key; LOG_S_DEBUG("[S3] DeleteObjectResponse '" << key << "' at tablet " << TabletId); @@ -362,17 +372,12 @@ public: } auto& ex = it->second; - ex.FinishKey(key); + TUnifiedBlobId blobId = ex.FinishKey(key); - if (hasError) { - ex.Event->Status = NKikimrProto::ERROR; - Y_VERIFY(ex.Event->ErrorStrings.emplace(key, errStr).second, "%s", key.data()); - } + ex.Event->AddResult(blobId, key, hasError, errStr); if (ex.ExtractionFinished()) { - if (ex.Event->Status == NKikimrProto::UNKNOWN) { - ex.Event->Status = NKikimrProto::OK; - } + Y_VERIFY(ex.Event->Finished()); Send(ShardActor, ex.Event.release()); Exports.erase(exportNo); } |