aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-07-20 14:02:37 +0300
committerchertus <azuikov@ydb.tech>2023-07-20 14:02:37 +0300
commite1c30a1e0cbe18e5731b8f0032523f46353a3025 (patch)
tree3d58a6769489a51bd05e1a70c6cb95b1fa265124
parent71fe3eaed29491cd05552a526c531f5c7c207d50 (diff)
downloadydb-e1c30a1e0cbe18e5731b8f0032523f46353a3025.tar.gz
KIKIMR-18570 check S3 object via Head for consistency
-rw-r--r--ydb/core/tx/tiering/s3_actor.cpp9
-rw-r--r--ydb/core/wrappers/events/object_exists.h10
-rw-r--r--ydb/core/wrappers/fake_storage.cpp20
-rw-r--r--ydb/core/wrappers/s3_storage.cpp2
4 files changed, 27 insertions, 14 deletions
diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp
index cb0e4209214..1c5f17d7e65 100644
--- a/ydb/core/tx/tiering/s3_actor.cpp
+++ b/ydb/core/tx/tiering/s3_actor.cpp
@@ -283,10 +283,9 @@ public:
const auto& resultOutcome = msg.Result;
if (!resultOutcome.IsSuccess()) {
- KeyFinished(context->GetKey(), true, LogError("CheckObjectExistsResponse", resultOutcome.GetError(), context->GetKey()));
- } else if (!msg.IsExists()) {
SendPutObject(context->GetKey(), std::move(context->DetachData()));
} else {
+ // TODO: check CRC
KeyFinished(context->GetKey(), false, "");
}
}
@@ -486,10 +485,10 @@ private:
}
void SendPutObjectIfNotExists(const TString& key, TString&& data) {
- auto request = Aws::S3::Model::ListObjectsRequest()
- .WithPrefix(key);
+ auto request = Aws::S3::Model::HeadObjectRequest()
+ .WithKey(key);
- LOG_S_DEBUG("[S3] PutObjectIfNotExists->ListObjectsRequest key '" << key << "' at tablet " << TabletId);
+ LOG_S_DEBUG("[S3] PutObjectIfNotExists->HeadObjectRequest key '" << key << "' at tablet " << TabletId);
std::shared_ptr<TEvCheckObjectExistsRequestContext> context = std::make_shared<TEvCheckObjectExistsRequestContext>(key, std::move(data));
Send(ExternalStorageActorId, new TEvExternalStorage::TEvCheckObjectExistsRequest(request, context));
}
diff --git a/ydb/core/wrappers/events/object_exists.h b/ydb/core/wrappers/events/object_exists.h
index d16a06143ef..3fbb5d873f7 100644
--- a/ydb/core/wrappers/events/object_exists.h
+++ b/ydb/core/wrappers/events/object_exists.h
@@ -14,21 +14,21 @@
namespace NKikimr::NWrappers::NExternalStorage {
class TEvCheckObjectExistsRequest: public TGenericRequest<TEvCheckObjectExistsRequest,
- EvCheckObjectExistsRequest, Aws::S3::Model::ListObjectsRequest> {
+ EvCheckObjectExistsRequest, Aws::S3::Model::HeadObjectRequest> {
private:
- using TBase = TGenericRequest<TEvCheckObjectExistsRequest, EvCheckObjectExistsRequest, Aws::S3::Model::ListObjectsRequest>;
+ using TBase = TGenericRequest<TEvCheckObjectExistsRequest, EvCheckObjectExistsRequest, Aws::S3::Model::HeadObjectRequest>;
public:
using TBase::TBase;
};
class TEvCheckObjectExistsResponse: public TBaseGenericResponse<TEvCheckObjectExistsResponse,
- EvCheckObjectExistsResponse, Aws::S3::Model::ListObjectsResult> {
+ EvCheckObjectExistsResponse, Aws::S3::Model::HeadObjectResult> {
private:
- using TBase = TBaseGenericResponse<TEvCheckObjectExistsResponse, EvCheckObjectExistsResponse, Aws::S3::Model::ListObjectsResult>;
+ using TBase = TBaseGenericResponse<TEvCheckObjectExistsResponse, EvCheckObjectExistsResponse, Aws::S3::Model::HeadObjectResult>;
public:
using TBase::TBase;
bool IsExists() const {
- return Result.IsSuccess() && Result.GetResult().GetContents().size();
+ return Result.IsSuccess();
}
};
}
diff --git a/ydb/core/wrappers/fake_storage.cpp b/ydb/core/wrappers/fake_storage.cpp
index 6b516333ade..e2a14e8b1d2 100644
--- a/ydb/core/wrappers/fake_storage.cpp
+++ b/ydb/core/wrappers/fake_storage.cpp
@@ -178,10 +178,24 @@ void TFakeExternalStorage::Execute(TEvAbortMultipartUploadRequest::TPtr& /*ev*/)
void TFakeExternalStorage::Execute(TEvCheckObjectExistsRequest::TPtr& ev) const {
TGuard<TMutex> g(Mutex);
- auto awsResult = BuildListObjectsResult(ev->Get()->GetRequest());
- THolder<TEvCheckObjectExistsResponse> result(new TEvCheckObjectExistsResponse(awsResult, ev->Get()->GetRequestContext()));
- TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
+ auto& bucket = GetBucket(AwsToString(ev->Get()->GetRequest().GetBucket()));
+ const TString key = AwsToString(ev->Get()->GetRequest().GetKey());
+ auto object = bucket.GetObject(key);
+ std::unique_ptr<TEvCheckObjectExistsResponse> result;
+ if (object) {
+ Aws::S3::Model::HeadObjectResult awsResult;
+ awsResult.SetETag(MD5::Calc(*object));
+ awsResult.SetContentLength(object->size());
+ result.reset(new TEvCheckObjectExistsResponse(awsResult, ev->Get()->GetRequestContext()));
+ Y_VERIFY_DEBUG(result->IsSuccess());
+ } else {
+ Aws::Utils::Outcome<Aws::S3::Model::HeadObjectResult, Aws::S3::S3Error> awsOutcome;
+ result.reset(new TEvCheckObjectExistsResponse(awsOutcome, ev->Get()->GetRequestContext()));
+ Y_VERIFY_DEBUG(!result->IsSuccess());
+ }
+
+ TlsActivationContext->ActorSystem()->Send(ev->Sender, result.release());
}
void TFakeExternalStorage::Execute(TEvUploadPartCopyRequest::TPtr& /*ev*/) const {
diff --git a/ydb/core/wrappers/s3_storage.cpp b/ydb/core/wrappers/s3_storage.cpp
index 83712d80e8e..394c2436d4e 100644
--- a/ydb/core/wrappers/s3_storage.cpp
+++ b/ydb/core/wrappers/s3_storage.cpp
@@ -270,7 +270,7 @@ void TS3ExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const {
void TS3ExternalStorage::Execute(TEvCheckObjectExistsRequest::TPtr& ev) const {
Call<TEvCheckObjectExistsRequest, TEvCheckObjectExistsResponse, TContextBase>(
- ev, &S3Client::ListObjectsAsync);
+ ev, &S3Client::HeadObjectAsync);
}
void TS3ExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const {