aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-11-03 15:12:56 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-11-03 15:12:56 +0300
commitf9328075bb661ee43f8bb9078cdf8fc5bfd3894e (patch)
tree468c70cabb423a8ece5a2fd3c42d557e116b8f55
parentbc296b657d770d736b1e43081ce3322268ca34cf (diff)
downloadydb-f9328075bb661ee43f8bb9078cdf8fc5bfd3894e.tar.gz
additional commands for external storage wrapper
-rw-r--r--ydb/core/wrappers/abstract.h4
-rw-r--r--ydb/core/wrappers/events/CMakeLists.txt1
-rw-r--r--ydb/core/wrappers/events/abstract.h1
-rw-r--r--ydb/core/wrappers/events/delete_objects.cpp5
-rw-r--r--ydb/core/wrappers/events/delete_objects.h63
-rw-r--r--ydb/core/wrappers/events/list_objects.h4
-rw-r--r--ydb/core/wrappers/events/s3_out.cpp13
-rw-r--r--ydb/core/wrappers/events/s3_out.h17
-rw-r--r--ydb/core/wrappers/fake_storage.cpp48
-rw-r--r--ydb/core/wrappers/fake_storage.h8
-rw-r--r--ydb/core/wrappers/s3_storage.cpp20
-rw-r--r--ydb/core/wrappers/s3_storage.h1
-rw-r--r--ydb/core/wrappers/s3_wrapper.cpp11
13 files changed, 176 insertions, 20 deletions
diff --git a/ydb/core/wrappers/abstract.h b/ydb/core/wrappers/abstract.h
index b7ed2a68887..4f00486ce7d 100644
--- a/ydb/core/wrappers/abstract.h
+++ b/ydb/core/wrappers/abstract.h
@@ -2,6 +2,7 @@
#include <ydb/core/base/events.h>
#include <ydb/core/wrappers/events/abstract.h>
#include <ydb/core/wrappers/events/common.h>
+#include <ydb/core/wrappers/events/delete_objects.h>
#include <ydb/core/wrappers/events/list_objects.h>
#include <ydb/core/wrappers/events/object_exists.h>
#include <util/generic/ptr.h>
@@ -19,6 +20,8 @@ struct TEvExternalStorage {
using TEvPutObjectResponse = NExternalStorage::TEvPutObjectResponse;
using TEvDeleteObjectRequest = NExternalStorage::TEvDeleteObjectRequest;
using TEvDeleteObjectResponse = NExternalStorage::TEvDeleteObjectResponse;
+ using TEvDeleteObjectsRequest = NExternalStorage::TEvDeleteObjectsRequest;
+ using TEvDeleteObjectsResponse = NExternalStorage::TEvDeleteObjectsResponse;
using TEvCreateMultipartUploadRequest = NExternalStorage::TEvCreateMultipartUploadRequest;
using TEvCreateMultipartUploadResponse = NExternalStorage::TEvCreateMultipartUploadResponse;
using TEvUploadPartRequest = NExternalStorage::TEvUploadPartRequest;
@@ -43,6 +46,7 @@ public:
virtual void Execute(TEvHeadObjectRequest::TPtr& ev) const = 0;
virtual void Execute(TEvPutObjectRequest::TPtr& ev) const = 0;
virtual void Execute(TEvDeleteObjectRequest::TPtr& ev) const = 0;
+ virtual void Execute(TEvDeleteObjectsRequest::TPtr& ev) const = 0;
virtual void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const = 0;
virtual void Execute(TEvUploadPartRequest::TPtr& ev) const = 0;
virtual void Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const = 0;
diff --git a/ydb/core/wrappers/events/CMakeLists.txt b/ydb/core/wrappers/events/CMakeLists.txt
index 157bd927c79..5b7aa6402b6 100644
--- a/ydb/core/wrappers/events/CMakeLists.txt
+++ b/ydb/core/wrappers/events/CMakeLists.txt
@@ -22,6 +22,7 @@ target_sources(core-wrappers-events PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/list_objects.cpp
${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/object_exists.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/delete_objects.cpp
${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/s3_out.cpp
${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/abstract.cpp
)
diff --git a/ydb/core/wrappers/events/abstract.h b/ydb/core/wrappers/events/abstract.h
index 6e648ba8649..2c3aff8ce3a 100644
--- a/ydb/core/wrappers/events/abstract.h
+++ b/ydb/core/wrappers/events/abstract.h
@@ -21,6 +21,7 @@ public:
EV_REQUEST_RESPONSE(HeadObject),
EV_REQUEST_RESPONSE(PutObject),
EV_REQUEST_RESPONSE(DeleteObject),
+ EV_REQUEST_RESPONSE(DeleteObjects),
EV_REQUEST_RESPONSE(CreateMultipartUpload),
EV_REQUEST_RESPONSE(UploadPart),
EV_REQUEST_RESPONSE(CompleteMultipartUpload),
diff --git a/ydb/core/wrappers/events/delete_objects.cpp b/ydb/core/wrappers/events/delete_objects.cpp
new file mode 100644
index 00000000000..e9a26c0ac21
--- /dev/null
+++ b/ydb/core/wrappers/events/delete_objects.cpp
@@ -0,0 +1,5 @@
+#include "delete_objects.h"
+
+namespace NKikimr::NWrappers {
+
+}
diff --git a/ydb/core/wrappers/events/delete_objects.h b/ydb/core/wrappers/events/delete_objects.h
new file mode 100644
index 00000000000..f4da8e49bbd
--- /dev/null
+++ b/ydb/core/wrappers/events/delete_objects.h
@@ -0,0 +1,63 @@
+#pragma once
+#include "abstract.h"
+#include "common.h"
+
+#include <ydb/core/base/events.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
+#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Errors.h>
+#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectsRequest.h>
+#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectsResult.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <util/generic/ptr.h>
+
+namespace NKikimr::NWrappers::NExternalStorage {
+
+ class TEvDeleteObjectsRequest: public TEventLocal<TEvDeleteObjectsRequest, EvDeleteObjectsRequest> {
+ public:
+ using TRequest = Aws::S3::Model::DeleteObjectsRequest;
+ private:
+ TRequest Request;
+ public:
+ TEvDeleteObjectsRequest(const TRequest& request)
+ : Request(request) {
+
+ }
+ IRequestContext::TPtr GetRequestContext() const {
+ return nullptr;
+ }
+ const TRequest& GetRequest() const {
+ return Request;
+ }
+ TRequest* operator->() {
+ return &Request;
+ }
+ };
+ class TEvDeleteObjectsResponse: public TEventLocal<TEvDeleteObjectsResponse, EvDeleteObjectsResponse> {
+ public:
+ using TResult = Aws::S3::Model::DeleteObjectsResult;
+ using TOutcome = Aws::Utils::Outcome<TResult, Aws::S3::S3Error>;
+ using TKey = std::optional<TString>;
+ private:
+ TOutcome Outcome;
+ public:
+ TEvDeleteObjectsResponse(const TOutcome& result)
+ : Outcome(result)
+ {
+
+ }
+ bool IsSuccess() const {
+ return Outcome.IsSuccess();
+ }
+ const Aws::S3::S3Error& GetError() const {
+ return Outcome.GetError();
+ }
+ const TResult& GetResult() const {
+ return Outcome.GetResult();
+ }
+ const TResult* operator->() const {
+ return &Outcome.GetResult();
+ }
+ };
+
+}
diff --git a/ydb/core/wrappers/events/list_objects.h b/ydb/core/wrappers/events/list_objects.h
index 9da75e0d610..714296c5574 100644
--- a/ydb/core/wrappers/events/list_objects.h
+++ b/ydb/core/wrappers/events/list_objects.h
@@ -19,6 +19,10 @@ namespace NKikimr::NWrappers::NExternalStorage {
private:
TRequest Request;
public:
+ TEvListObjectsRequest(const TRequest& request)
+ : Request(request) {
+
+ }
IRequestContext::TPtr GetRequestContext() const {
return nullptr;
}
diff --git a/ydb/core/wrappers/events/s3_out.cpp b/ydb/core/wrappers/events/s3_out.cpp
index 8a17f59c226..3fdc13145ba 100644
--- a/ydb/core/wrappers/events/s3_out.cpp
+++ b/ydb/core/wrappers/events/s3_out.cpp
@@ -128,6 +128,19 @@ void Out(IOutputStream& out, const DeleteObjectOutcome& outcome) {
OutOutcome(out, outcome);
}
+void Out(IOutputStream& out, const DeleteObjectsRequest& request) {
+ using T = DeleteObjectsRequest;
+ OutRequest(out, request, { &Bucket<T> });
+}
+
+void Out(IOutputStream& out, const DeleteObjectsResult& result) {
+ OutResult(out, result, "DeleteObjectsResult", {});
+}
+
+void Out(IOutputStream& out, const DeleteObjectsOutcome& outcome) {
+ OutOutcome(out, outcome);
+}
+
void Out(IOutputStream& out, const CreateMultipartUploadRequest& request) {
using T = CreateMultipartUploadRequest;
OutRequest(out, request, {&Bucket<T>, &Key<T>});
diff --git a/ydb/core/wrappers/events/s3_out.h b/ydb/core/wrappers/events/s3_out.h
index e9979b39b48..5dc7a29cab8 100644
--- a/ydb/core/wrappers/events/s3_out.h
+++ b/ydb/core/wrappers/events/s3_out.h
@@ -8,6 +8,7 @@
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/PutObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectRequest.h>
+#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectsRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/UploadPartRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Client.h>
@@ -36,6 +37,10 @@ void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectRequest& request)
void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectResult& result);
void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectOutcome& outcome);
+void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectsRequest& request);
+void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectsResult& result);
+void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectsOutcome& outcome);
+
void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadRequest& request);
void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadResult& result);
void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadOutcome& outcome);
@@ -85,6 +90,18 @@ Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::ListObjectsOutcome, out, value) {
NKikimr::NWrappers::Out(out, value);
}
+Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectsRequest, out, value) {
+ NKikimr::NWrappers::Out(out, value);
+}
+
+Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectsResult, out, value) {
+ NKikimr::NWrappers::Out(out, value);
+}
+
+Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectsOutcome, out, value) {
+ NKikimr::NWrappers::Out(out, value);
+}
+
Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::HeadObjectRequest, out, value) {
NKikimr::NWrappers::Out(out, value);
}
diff --git a/ydb/core/wrappers/fake_storage.cpp b/ydb/core/wrappers/fake_storage.cpp
index d90aacf674a..147306aeb1b 100644
--- a/ydb/core/wrappers/fake_storage.cpp
+++ b/ydb/core/wrappers/fake_storage.cpp
@@ -48,27 +48,22 @@ void TFakeExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const {
auto& awsPrefix = ev->Get()->GetRequest().GetPrefix();
const TString prefix(awsPrefix.data(), awsPrefix.size());
THolder<TEvListObjectsResponse> result;
- if (!prefix) {
- TEvListObjectsResponse::TOutcome awsOutcome;
- result = MakeHolder<TEvListObjectsResponse>(awsOutcome);
- TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
- } else {
- TGuard<TMutex> g(Mutex);
- TEvListObjectsResponse::TResult awsResult;
- for (auto&& i : Data) {
- if (!i.first.StartsWith(prefix)) {
- continue;
- } else {
- Aws::S3::Model::Object objectMeta;
- objectMeta.SetKey(i.first);
- objectMeta.SetSize(i.second.size());
- awsResult.AddContents(std::move(objectMeta));
- }
+ TGuard<TMutex> g(Mutex);
+ TEvListObjectsResponse::TResult awsResult;
+ for (auto&& i : Data) {
+ if (!!prefix && !i.first.StartsWith(prefix)) {
+ continue;
}
- TEvListObjectsResponse::TOutcome awsOutcome(awsResult);
- result = MakeHolder<TEvListObjectsResponse>(awsOutcome);
- TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
+ Aws::S3::Model::Object objectMeta;
+ objectMeta.SetKey(i.first);
+ objectMeta.SetSize(i.second.size());
+ awsResult.AddContents(std::move(objectMeta));
+ break;
}
+ awsResult.SetIsTruncated(Data.size() > 1);
+ TEvListObjectsResponse::TOutcome awsOutcome(awsResult);
+ result = MakeHolder<TEvListObjectsResponse>(awsOutcome);
+ TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
}
void TFakeExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const {
@@ -149,6 +144,21 @@ void TFakeExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const {
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
}
+void TFakeExternalStorage::Execute(TEvDeleteObjectsRequest::TPtr& ev) const {
+ TGuard<TMutex> g(Mutex);
+ Aws::S3::Model::DeleteObjectsResult awsResult;
+ for (auto&& awsKey : ev->Get()->GetRequest().GetDelete().GetObjects()) {
+ const TString key(awsKey.GetKey().data(), awsKey.GetKey().size());
+ Data.erase(key);
+ Aws::S3::Model::DeletedObject dObject;
+ dObject.WithKey(key);
+ awsResult.AddDeleted(std::move(dObject));
+ }
+
+ THolder<TEvDeleteObjectsResponse> result(new TEvDeleteObjectsResponse(awsResult));
+ TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
+}
+
void TFakeExternalStorage::Execute(TEvCreateMultipartUploadRequest::TPtr& /*ev*/) const {
}
diff --git a/ydb/core/wrappers/fake_storage.h b/ydb/core/wrappers/fake_storage.h
index fcf50f29a23..d14a0606b8e 100644
--- a/ydb/core/wrappers/fake_storage.h
+++ b/ydb/core/wrappers/fake_storage.h
@@ -19,11 +19,16 @@ private:
mutable TMap<TString, TString> Data;
public:
TFakeExternalStorage() = default;
+ TMap<TString, TString> GetData() const {
+ TGuard<TMutex> g(Mutex);
+ return Data;
+ }
void Execute(TEvListObjectsRequest::TPtr& ev) const;
void Execute(TEvGetObjectRequest::TPtr& ev) const;
void Execute(TEvHeadObjectRequest::TPtr& ev) const;
void Execute(TEvPutObjectRequest::TPtr& ev) const;
void Execute(TEvDeleteObjectRequest::TPtr& ev) const;
+ void Execute(TEvDeleteObjectsRequest::TPtr& ev) const;
void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const;
void Execute(TEvUploadPartRequest::TPtr& ev) const;
void Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const;
@@ -52,6 +57,9 @@ public:
virtual void Execute(TEvDeleteObjectRequest::TPtr& ev) const override {
Singleton<TFakeExternalStorage>()->Execute(ev);
}
+ virtual void Execute(TEvDeleteObjectsRequest::TPtr& ev) const override {
+ Singleton<TFakeExternalStorage>()->Execute(ev);
+ }
virtual void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const override {
Singleton<TFakeExternalStorage>()->Execute(ev);
}
diff --git a/ydb/core/wrappers/s3_storage.cpp b/ydb/core/wrappers/s3_storage.cpp
index 994bf81b492..86cb5a86f21 100644
--- a/ydb/core/wrappers/s3_storage.cpp
+++ b/ydb/core/wrappers/s3_storage.cpp
@@ -98,6 +98,21 @@ public:
};
template <>
+class TContextBase<TEvDeleteObjectsRequest, TEvDeleteObjectsResponse>
+ : public TCommonContextBase<TEvDeleteObjectsRequest, TEvDeleteObjectsResponse> {
+private:
+ using TBase = TCommonContextBase<TEvDeleteObjectsRequest, TEvDeleteObjectsResponse>;
+public:
+ using TBase::Send;
+ using TBase::TBase;
+ void Reply(const typename TBase::TRequest& /*request*/, const typename TBase::TOutcome& outcome) const {
+ Y_VERIFY(!std::exchange(TBase::Replied, true), "Double-reply");
+
+ Send(MakeHolder<TEvDeleteObjectsResponse>(outcome).Release());
+ }
+};
+
+template <>
class TContextBase<TEvCheckObjectExistsRequest, TEvCheckObjectExistsResponse>
: public TCommonContextBase<TEvCheckObjectExistsRequest, TEvCheckObjectExistsResponse> {
private:
@@ -261,6 +276,11 @@ void TS3ExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const {
ev, &S3Client::DeleteObjectAsync);
}
+void TS3ExternalStorage::Execute(TEvDeleteObjectsRequest::TPtr& ev) const {
+ Call<TEvDeleteObjectsRequest, TEvDeleteObjectsResponse, TContextBase>(
+ ev, &S3Client::DeleteObjectsAsync);
+}
+
void TS3ExternalStorage::Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const {
Call<TEvCreateMultipartUploadRequest, TEvCreateMultipartUploadResponse, TContextBase>(
ev, &S3Client::CreateMultipartUploadAsync);
diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h
index b09229f16c4..11009c2171a 100644
--- a/ydb/core/wrappers/s3_storage.h
+++ b/ydb/core/wrappers/s3_storage.h
@@ -78,6 +78,7 @@ public:
virtual void Execute(TEvHeadObjectRequest::TPtr& ev) const override;
virtual void Execute(TEvPutObjectRequest::TPtr& ev) const override;
virtual void Execute(TEvDeleteObjectRequest::TPtr& ev) const override;
+ virtual void Execute(TEvDeleteObjectsRequest::TPtr& ev) const override;
virtual void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const override;
virtual void Execute(TEvUploadPartRequest::TPtr& ev) const override;
virtual void Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const override;
diff --git a/ydb/core/wrappers/s3_wrapper.cpp b/ydb/core/wrappers/s3_wrapper.cpp
index 476d35a862a..42474eb605f 100644
--- a/ydb/core/wrappers/s3_wrapper.cpp
+++ b/ydb/core/wrappers/s3_wrapper.cpp
@@ -23,6 +23,10 @@ namespace NKikimr::NWrappers {
namespace NExternalStorage {
class TS3Wrapper: public TActor<TS3Wrapper> {
+ void Handle(TEvListObjectsRequest::TPtr& ev) {
+ CSOperator->Execute(ev);
+ }
+
void Handle(TEvGetObjectRequest::TPtr& ev) {
CSOperator->Execute(ev);
}
@@ -43,6 +47,10 @@ class TS3Wrapper: public TActor<TS3Wrapper> {
CSOperator->Execute(ev);
}
+ void Handle(TEvDeleteObjectsRequest::TPtr& ev) {
+ CSOperator->Execute(ev);
+ }
+
void Handle(TEvCreateMultipartUploadRequest::TPtr& ev) {
CSOperator->Execute(ev);
}
@@ -73,17 +81,18 @@ public:
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvListObjectsRequest, Handle);
hFunc(TEvGetObjectRequest, Handle);
hFunc(TEvHeadObjectRequest, Handle);
hFunc(TEvPutObjectRequest, Handle);
hFunc(TEvDeleteObjectRequest, Handle);
+ hFunc(TEvDeleteObjectsRequest, Handle);
hFunc(TEvCreateMultipartUploadRequest, Handle);
hFunc(TEvUploadPartRequest, Handle);
hFunc(TEvCompleteMultipartUploadRequest, Handle);
hFunc(TEvAbortMultipartUploadRequest, Handle);
hFunc(TEvCheckObjectExistsRequest, Handle);
-
cFunc(TEvents::TEvPoison::EventType, PassAway);
}
}