diff options
| author | chertus <[email protected]> | 2023-07-21 13:13:22 +0300 | 
|---|---|---|
| committer | chertus <[email protected]> | 2023-07-21 13:13:22 +0300 | 
| commit | 122a6055cef2bc785407c69b33b858a07b319e66 (patch) | |
| tree | 5d49385d99458650bfd401ba3751e8573d83f6f7 | |
| parent | 432e25fdf8ca8de82d63eb6663b62a5fba924cd0 (diff) | |
KIKIMR-18779 batch S3 deletes
| -rw-r--r-- | ydb/core/tx/tiering/s3_actor.cpp | 95 | 
1 files changed, 82 insertions, 13 deletions
diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp index 1c5f17d7e65..0c5c930757d 100644 --- a/ydb/core/tx/tiering/s3_actor.cpp +++ b/ydb/core/tx/tiering/s3_actor.cpp @@ -181,7 +181,11 @@ public:          std::vector<NOlap::TEvictedBlob> newEvicted;          newEvicted.reserve(eventEvicted.size()); -        for (auto&& evict : forget.Event->Evicted) { +        static constexpr ui32 DELETE_PORTION = 1000; +        std::vector<TString> keys; +        keys.reserve(DELETE_PORTION); + +        for (auto&& evict : eventEvicted) {              if (!evict.ExternBlob.IsS3Blob()) {                  LOG_S_ERROR("[S3] Forget not exported '" << evict.Blob << "' at tablet " << TabletId);                  continue; @@ -198,7 +202,15 @@ public:                  ForgettingKeys[key] = forgetNo;              } -            SendDeleteObject(key); +            keys.push_back(key); +            if (keys.size() == DELETE_PORTION) { +                SendDeleteObjects(keys); +                keys.clear(); +            } +        } +        if (keys.size()) { +            SendDeleteObjects(keys); +            keys.clear();          }          eventEvicted.swap(newEvicted); @@ -296,7 +308,7 @@ public:          auto& msg = *ev->Get();          const auto& resultOutcome = msg.Result; -        TString errStr; +        std::optional<TString> errStr;          if (!resultOutcome.IsSuccess()) {              errStr = LogError("DeleteObjectResponse", resultOutcome.GetError(), msg.Key);          } @@ -306,10 +318,12 @@ public:              return;          } -        TString key = *msg.Key; +        ForgetObject(*msg.Key, errStr); +    } +    void ForgetObject(const TString& key, const std::optional<TString>& errStr) {          if (!ForgettingKeys.count(key)) { -            LOG_S_INFO("[S3] DeleteObjectResponse for unknown key '" << key << "' at tablet " << TabletId); +            LOG_S_INFO("[S3] DeleteObject(s)Response for unknown key '" << key << "' at tablet " << TabletId);              return;          } @@ -317,19 +331,19 @@ public:          ForgettingKeys.erase(key);          if (!Forgets.count(forgetNo)) { -            LOG_S_INFO("[S3] DeleteObjectResponse for unknown forget with key '" << key << "' at tablet " << TabletId); +            LOG_S_INFO("[S3] DeleteObject(s)Response for unknown forget with key '" << key << "' at tablet " << TabletId);              return;          } -        LOG_S_NOTICE("[S3] DeleteObjectResponse '" << key << "' " -            << (resultOutcome.IsSuccess() ? "OK" : resultOutcome.GetError().GetMessage()) << " at tablet " << TabletId); +        LOG_S_NOTICE("[S3] DeleteObject(s)Response '" << key << "' " +            << (errStr ? *errStr : "OK") << " at tablet " << TabletId);          auto& forget = Forgets[forgetNo];          forget.KeysToDelete.erase(key); -        if (!errStr.empty()) { +        if (errStr) {              forget.Event->Status = NKikimrProto::ERROR; -            forget.Event->ErrorStr = errStr; +            forget.Event->ErrorStr = *errStr;              Send(ShardActor, forget.Event.release());              Forgets.erase(forgetNo);          } else if (forget.KeysToDelete.empty()) { @@ -339,6 +353,28 @@ public:          }      } +    void Handle(TEvExternalStorage::TEvDeleteObjectsResponse::TPtr& ev) { +        Y_VERIFY(Initialized()); + +        auto& msg = *ev->Get(); +        const auto& resultOutcome = msg.Result; +        const auto& objsDeleted = resultOutcome.GetResult().GetDeleted(); + +        std::optional<TString> errStr; +        if (!resultOutcome.IsSuccess()) { +            errStr = LogError("DeleteObjectsResponse", resultOutcome.GetError(), objsDeleted); +        } + +        if (objsDeleted.empty()) { +            LOG_S_ERROR("[S3] no keys in DeleteObjectsResponse at tablet " << TabletId); +            return; +        } + +        for (const auto& obj : objsDeleted) { +            ForgetObject(TString(obj.GetKey()), errStr); +        } +    } +      void Handle(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {          Y_VERIFY(Initialized()); @@ -449,6 +485,7 @@ private:              cFunc(TEvents::TEvPoisonPill::EventType, PassAway);              hFunc(TEvExternalStorage::TEvPutObjectResponse, Handle);              hFunc(TEvExternalStorage::TEvDeleteObjectResponse, Handle); +            hFunc(TEvExternalStorage::TEvDeleteObjectsResponse, Handle);              hFunc(TEvExternalStorage::TEvGetObjectResponse, Handle);              hFunc(TEvExternalStorage::TEvCheckObjectExistsResponse, Handle); @@ -518,18 +555,50 @@ private:          Send(ExternalStorageActorId, new TEvExternalStorage::TEvDeleteObjectRequest(request));      } +    void SendDeleteObjects(const std::vector<TString>& keys) const { +        Aws::Vector<Aws::S3::Model::ObjectIdentifier> awsKeys; +        awsKeys.reserve(keys.size()); +        for (const auto& key : keys) { +            awsKeys.emplace_back(Aws::S3::Model::ObjectIdentifier().WithKey(key)); +        } +        Y_VERIFY(awsKeys.size()); + +        auto request = Aws::S3::Model::DeleteObjectsRequest() +            .WithDelete(Aws::S3::Model::Delete().WithObjects(std::move(awsKeys))); + +        Send(ExternalStorageActorId, new TEvExternalStorage::TEvDeleteObjectsRequest(request)); +    } +      TString LogError(const TString& responseType, const Aws::S3::S3Error& error,                       const std::optional<TString>& key) const { -        TString errStr = TString(error.GetExceptionName()) + " " + error.GetMessage(); +        TString errStr = /*TString(error.GetExceptionName()) + " " +*/ TString(error.GetMessage()); -        LOG_S_NOTICE("[S3] Error in " << responseType << " for key '" << (key ? *key : TString()) -            << "' at tablet " << TabletId << ": " << errStr); +        LOG_S_NOTICE("[S3] Error in " << responseType << " for key '" << (key ? *key : TString()) << ": " << errStr +            << "' at tablet " << TabletId);          if (errStr.empty() && !key) {              errStr = responseType + " with no key";          }          return errStr;      } + +    TString LogError(const TString& responseType, const Aws::S3::S3Error& error, +                     const Aws::Vector<Aws::S3::Model::DeletedObject>& objs) const { +        TString errStr = /*TString(error.GetExceptionName()) + " " +*/ TString(error.GetMessage()); + +        LOG_S_NOTICE("[S3] Error in " << responseType << " for " << ToString(objs) << ": " << errStr +            << " at tablet " << TabletId); +        return errStr; +    } + +    static TString ToString(const Aws::Vector<Aws::S3::Model::DeletedObject>& objs) { +        TStringBuilder ss; +        ss << "keys"; +        for (auto& obj : objs) { +            ss << " '" << obj.GetKey() << "'"; +        } +        return ss; +    }  };  IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName) {  | 
