aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-02-20 20:39:15 +0300
committerchertus <azuikov@ydb.tech>2023-02-20 20:39:15 +0300
commit95faf209707e9cb22e5109c9bf35611fe7c1d449 (patch)
treeb5f8d56b6939423dd29ad8b519473b81d720c98d
parentf7ff62905473e34a95664927b43cd3b483f01986 (diff)
downloadydb-95faf209707e9cb22e5109c9bf35611fe7c1d449.tar.gz
apply partial export at ColumnShard
-rw-r--r--ydb/core/tx/columnshard/columnshard__export.cpp23
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h19
-rw-r--r--ydb/core/tx/tiering/s3_actor.cpp47
3 files changed, 61 insertions, 28 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp
index f2d7b8eb0a..92ee25ad99 100644
--- a/ydb/core/tx/columnshard/columnshard__export.cpp
+++ b/ydb/core/tx/columnshard/columnshard__export.cpp
@@ -33,7 +33,7 @@ bool TTxExportFinish::Execute(TTransactionContext& txc, const TActorContext&) {
auto& msg = *Ev->Get();
auto status = msg.Status;
- if (status == NKikimrProto::OK) {
+ {
TBlobManagerDb blobManagerDb(txc.DB);
for (auto& [blob, externId] : msg.SrcToDstBlobs) {
@@ -42,6 +42,11 @@ bool TTxExportFinish::Execute(TTransactionContext& txc, const TActorContext&) {
Y_VERIFY(externId.IsS3Blob());
bool dropped = false;
+ if (!msg.Blobs.count(blobId)) {
+ Y_VERIFY(!msg.ErrorStrings.empty());
+ continue; // not exported
+ }
+
#if 0 // TODO: SELF_CACHED logic
NOlap::TEvictedBlob evict{
.State = EEvictState::SELF_CACHED,
@@ -78,7 +83,9 @@ bool TTxExportFinish::Execute(TTransactionContext& txc, const TActorContext&) {
// TODO: delete not present in S3 for sure (avoid race between export and forget)
#endif
}
+ }
+ if (status == NKikimrProto::OK) {
Self->IncCounter(COUNTER_EXPORT_SUCCESS);
} else {
Self->IncCounter(COUNTER_EXPORT_FAIL);
@@ -112,15 +119,19 @@ void TColumnShard::Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext&
if (status == NKikimrProto::UNKNOWN) {
LOG_S_DEBUG("Export (write): id " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
ExportBlobs(ctx, exportNo, tierName, pathId, std::move(msg.Blobs));
- } else if (status == NKikimrProto::OK) {
- LOG_S_DEBUG("Export (apply): id " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
- Execute(new TTxExportFinish(this, ev), ctx);
- } else if (status == NKikimrProto::ERROR) {
+ } else if (status == NKikimrProto::ERROR && msg.Blobs.empty()) {
LOG_S_WARN("Export (fail): id " << exportNo << " tier '" << tierName << "' error: "
<< ev->Get()->SerializeErrorsToString() << "' at tablet " << TabletID());
--ActiveEvictions;
} else {
- Y_VERIFY(false);
+ // There's no atomicity needed here. Allow partial export
+ if (status == NKikimrProto::ERROR) {
+ LOG_S_WARN("Export (partial): id " << exportNo << " tier '" << tierName << "' error: "
+ << ev->Get()->SerializeErrorsToString() << "' at tablet " << TabletID());
+ } else {
+ LOG_S_DEBUG("Export (apply): id " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
+ }
+ Execute(new TTxExportFinish(this, ev), ctx);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index b00f8e0441..b614ca9d7a 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -131,7 +131,7 @@ struct TEvPrivate {
TString TierName;
ui64 PathId = 0;
TActorId DstActor;
- TBlobDataMap Blobs;
+ TBlobDataMap Blobs; // src: blobId -> data map; dst: exported blobIds set
THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs;
TMap<TString, TString> ErrorStrings;
@@ -161,6 +161,23 @@ struct TEvPrivate {
Y_VERIFY(!Blobs.empty());
}
+ void AddResult(const TUnifiedBlobId& blobId, const TString& key, const bool hasError, const TString& errStr) {
+ if (hasError) {
+ Status = NKikimrProto::ERROR;
+ Y_VERIFY(ErrorStrings.emplace(key, errStr).second, "%s", key.data());
+ Blobs.erase(blobId);
+ } else if (!ErrorStrings.count(key)) { // (OK + !OK) == !OK
+ Y_VERIFY(Blobs.count(blobId));
+ if (Status == NKikimrProto::UNKNOWN) {
+ Status = NKikimrProto::OK;
+ }
+ }
+ }
+
+ bool Finished() const {
+ return (Blobs.size() + ErrorStrings.size()) == SrcToDstBlobs.size();
+ }
+
TString SerializeErrorsToString() const {
TStringBuilder sb;
for (auto&& i : ErrorStrings) {
diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp
index 5caaa56f45..7ab3c7a93d 100644
--- a/ydb/core/tx/tiering/s3_actor.cpp
+++ b/ydb/core/tx/tiering/s3_actor.cpp
@@ -44,17 +44,16 @@ public:
return KeysToWrite.empty();
}
- TS3Export& RegisterKey(const TString& key) {
- KeysToWrite.emplace(key);
- return *this;
+ void RegisterKey(const TString& key, const TUnifiedBlobId& blobId) {
+ KeysToWrite.emplace(key, blobId);
}
- TS3Export& FinishKey(const TString& key) {
- KeysToWrite.erase(key);
- return *this;
+ TUnifiedBlobId FinishKey(const TString& key) {
+ auto node = KeysToWrite.extract(key);
+ return node.mapped();
}
private:
- TSet<TString> KeysToWrite;
+ std::unordered_map<TString, TUnifiedBlobId> KeysToWrite;
};
struct TS3Forget {
@@ -118,15 +117,19 @@ public:
ui64 exportNo = msg.ExportNo;
Y_VERIFY(msg.DstActor == ShardActor);
- Y_VERIFY(!Exports.count(exportNo));
+ if (Exports.count(exportNo)) {
+ LOG_S_ERROR("[S3] Multiple exports with same export id '" << exportNo << "' at tablet " << TabletId);
+ return;
+ }
+
Exports[exportNo] = TS3Export(ev->Release());
auto& ex = Exports[exportNo];
for (auto& [blobId, blobData] : ex.Blobs()) {
TString key = ex.AddExported(blobId, msg.PathId).GetS3Key();
- Y_VERIFY(!ExportingKeys.count(key)); // TODO
+ Y_VERIFY(!ExportingKeys.count(key)); // TODO: allow reexport?
- ex.RegisterKey(key);
+ ex.RegisterKey(key, blobId);
ExportingKeys[key] = exportNo;
SendPutObjectIfNotExists(key, std::move(blobData));
@@ -185,7 +188,6 @@ public:
}
}
- // TODO: clean written blobs in failed export
void Handle(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
Y_VERIFY(Initialized());
@@ -198,7 +200,11 @@ public:
errStr = LogError("PutObjectResponse", resultOutcome.GetError(), msg.Key);
}
- Y_VERIFY(msg.Key); // FIXME
+ if (!msg.Key || msg.Key->empty()) {
+ LOG_S_ERROR("[S3] no key in PutObjectResponse at tablet " << TabletId);
+ return;
+ }
+
const TString key = *msg.Key;
LOG_S_DEBUG("[S3] PutObjectResponse '" << key << "' at tablet " << TabletId);
@@ -254,7 +260,11 @@ public:
errStr = LogError("DeleteObjectResponse", resultOutcome.GetError(), msg.Key);
}
- Y_VERIFY(msg.Key); // FIXME
+ if (!msg.Key || msg.Key->empty()) {
+ LOG_S_ERROR("[S3] no key in DeleteObjectResponse at tablet " << TabletId);
+ return;
+ }
+
TString key = *msg.Key;
LOG_S_DEBUG("[S3] DeleteObjectResponse '" << key << "' at tablet " << TabletId);
@@ -362,17 +372,12 @@ public:
}
auto& ex = it->second;
- ex.FinishKey(key);
+ TUnifiedBlobId blobId = ex.FinishKey(key);
- if (hasError) {
- ex.Event->Status = NKikimrProto::ERROR;
- Y_VERIFY(ex.Event->ErrorStrings.emplace(key, errStr).second, "%s", key.data());
- }
+ ex.Event->AddResult(blobId, key, hasError, errStr);
if (ex.ExtractionFinished()) {
- if (ex.Event->Status == NKikimrProto::UNKNOWN) {
- ex.Event->Status = NKikimrProto::OK;
- }
+ Y_VERIFY(ex.Event->Finished());
Send(ShardActor, ex.Event.release());
Exports.erase(exportNo);
}