diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-03 15:12:56 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-03 15:12:56 +0300 |
commit | f9328075bb661ee43f8bb9078cdf8fc5bfd3894e (patch) | |
tree | 468c70cabb423a8ece5a2fd3c42d557e116b8f55 | |
parent | bc296b657d770d736b1e43081ce3322268ca34cf (diff) | |
download | ydb-f9328075bb661ee43f8bb9078cdf8fc5bfd3894e.tar.gz |
additional commands for external storage wrapper
-rw-r--r-- | ydb/core/wrappers/abstract.h | 4 | ||||
-rw-r--r-- | ydb/core/wrappers/events/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/wrappers/events/abstract.h | 1 | ||||
-rw-r--r-- | ydb/core/wrappers/events/delete_objects.cpp | 5 | ||||
-rw-r--r-- | ydb/core/wrappers/events/delete_objects.h | 63 | ||||
-rw-r--r-- | ydb/core/wrappers/events/list_objects.h | 4 | ||||
-rw-r--r-- | ydb/core/wrappers/events/s3_out.cpp | 13 | ||||
-rw-r--r-- | ydb/core/wrappers/events/s3_out.h | 17 | ||||
-rw-r--r-- | ydb/core/wrappers/fake_storage.cpp | 48 | ||||
-rw-r--r-- | ydb/core/wrappers/fake_storage.h | 8 | ||||
-rw-r--r-- | ydb/core/wrappers/s3_storage.cpp | 20 | ||||
-rw-r--r-- | ydb/core/wrappers/s3_storage.h | 1 | ||||
-rw-r--r-- | ydb/core/wrappers/s3_wrapper.cpp | 11 |
13 files changed, 176 insertions, 20 deletions
diff --git a/ydb/core/wrappers/abstract.h b/ydb/core/wrappers/abstract.h index b7ed2a68887..4f00486ce7d 100644 --- a/ydb/core/wrappers/abstract.h +++ b/ydb/core/wrappers/abstract.h @@ -2,6 +2,7 @@ #include <ydb/core/base/events.h> #include <ydb/core/wrappers/events/abstract.h> #include <ydb/core/wrappers/events/common.h> +#include <ydb/core/wrappers/events/delete_objects.h> #include <ydb/core/wrappers/events/list_objects.h> #include <ydb/core/wrappers/events/object_exists.h> #include <util/generic/ptr.h> @@ -19,6 +20,8 @@ struct TEvExternalStorage { using TEvPutObjectResponse = NExternalStorage::TEvPutObjectResponse; using TEvDeleteObjectRequest = NExternalStorage::TEvDeleteObjectRequest; using TEvDeleteObjectResponse = NExternalStorage::TEvDeleteObjectResponse; + using TEvDeleteObjectsRequest = NExternalStorage::TEvDeleteObjectsRequest; + using TEvDeleteObjectsResponse = NExternalStorage::TEvDeleteObjectsResponse; using TEvCreateMultipartUploadRequest = NExternalStorage::TEvCreateMultipartUploadRequest; using TEvCreateMultipartUploadResponse = NExternalStorage::TEvCreateMultipartUploadResponse; using TEvUploadPartRequest = NExternalStorage::TEvUploadPartRequest; @@ -43,6 +46,7 @@ public: virtual void Execute(TEvHeadObjectRequest::TPtr& ev) const = 0; virtual void Execute(TEvPutObjectRequest::TPtr& ev) const = 0; virtual void Execute(TEvDeleteObjectRequest::TPtr& ev) const = 0; + virtual void Execute(TEvDeleteObjectsRequest::TPtr& ev) const = 0; virtual void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const = 0; virtual void Execute(TEvUploadPartRequest::TPtr& ev) const = 0; virtual void Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const = 0; diff --git a/ydb/core/wrappers/events/CMakeLists.txt b/ydb/core/wrappers/events/CMakeLists.txt index 157bd927c79..5b7aa6402b6 100644 --- a/ydb/core/wrappers/events/CMakeLists.txt +++ b/ydb/core/wrappers/events/CMakeLists.txt @@ -22,6 +22,7 @@ target_sources(core-wrappers-events PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/list_objects.cpp ${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/object_exists.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/delete_objects.cpp ${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/s3_out.cpp ${CMAKE_SOURCE_DIR}/ydb/core/wrappers/events/abstract.cpp ) diff --git a/ydb/core/wrappers/events/abstract.h b/ydb/core/wrappers/events/abstract.h index 6e648ba8649..2c3aff8ce3a 100644 --- a/ydb/core/wrappers/events/abstract.h +++ b/ydb/core/wrappers/events/abstract.h @@ -21,6 +21,7 @@ public: EV_REQUEST_RESPONSE(HeadObject), EV_REQUEST_RESPONSE(PutObject), EV_REQUEST_RESPONSE(DeleteObject), + EV_REQUEST_RESPONSE(DeleteObjects), EV_REQUEST_RESPONSE(CreateMultipartUpload), EV_REQUEST_RESPONSE(UploadPart), EV_REQUEST_RESPONSE(CompleteMultipartUpload), diff --git a/ydb/core/wrappers/events/delete_objects.cpp b/ydb/core/wrappers/events/delete_objects.cpp new file mode 100644 index 00000000000..e9a26c0ac21 --- /dev/null +++ b/ydb/core/wrappers/events/delete_objects.cpp @@ -0,0 +1,5 @@ +#include "delete_objects.h" + +namespace NKikimr::NWrappers { + +} diff --git a/ydb/core/wrappers/events/delete_objects.h b/ydb/core/wrappers/events/delete_objects.h new file mode 100644 index 00000000000..f4da8e49bbd --- /dev/null +++ b/ydb/core/wrappers/events/delete_objects.h @@ -0,0 +1,63 @@ +#pragma once +#include "abstract.h" +#include "common.h" + +#include <ydb/core/base/events.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> + +#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Errors.h> +#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectsRequest.h> +#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectsResult.h> +#include <library/cpp/actors/core/event_local.h> +#include <util/generic/ptr.h> + +namespace NKikimr::NWrappers::NExternalStorage { + + class TEvDeleteObjectsRequest: public TEventLocal<TEvDeleteObjectsRequest, EvDeleteObjectsRequest> { + public: + using TRequest = Aws::S3::Model::DeleteObjectsRequest; + private: + TRequest Request; + public: + TEvDeleteObjectsRequest(const TRequest& request) + : Request(request) { + + } + IRequestContext::TPtr GetRequestContext() const { + return nullptr; + } + const TRequest& GetRequest() const { + return Request; + } + TRequest* operator->() { + return &Request; + } + }; + class TEvDeleteObjectsResponse: public TEventLocal<TEvDeleteObjectsResponse, EvDeleteObjectsResponse> { + public: + using TResult = Aws::S3::Model::DeleteObjectsResult; + using TOutcome = Aws::Utils::Outcome<TResult, Aws::S3::S3Error>; + using TKey = std::optional<TString>; + private: + TOutcome Outcome; + public: + TEvDeleteObjectsResponse(const TOutcome& result) + : Outcome(result) + { + + } + bool IsSuccess() const { + return Outcome.IsSuccess(); + } + const Aws::S3::S3Error& GetError() const { + return Outcome.GetError(); + } + const TResult& GetResult() const { + return Outcome.GetResult(); + } + const TResult* operator->() const { + return &Outcome.GetResult(); + } + }; + +} diff --git a/ydb/core/wrappers/events/list_objects.h b/ydb/core/wrappers/events/list_objects.h index 9da75e0d610..714296c5574 100644 --- a/ydb/core/wrappers/events/list_objects.h +++ b/ydb/core/wrappers/events/list_objects.h @@ -19,6 +19,10 @@ namespace NKikimr::NWrappers::NExternalStorage { private: TRequest Request; public: + TEvListObjectsRequest(const TRequest& request) + : Request(request) { + + } IRequestContext::TPtr GetRequestContext() const { return nullptr; } diff --git a/ydb/core/wrappers/events/s3_out.cpp b/ydb/core/wrappers/events/s3_out.cpp index 8a17f59c226..3fdc13145ba 100644 --- a/ydb/core/wrappers/events/s3_out.cpp +++ b/ydb/core/wrappers/events/s3_out.cpp @@ -128,6 +128,19 @@ void Out(IOutputStream& out, const DeleteObjectOutcome& outcome) { OutOutcome(out, outcome); } +void Out(IOutputStream& out, const DeleteObjectsRequest& request) { + using T = DeleteObjectsRequest; + OutRequest(out, request, { &Bucket<T> }); +} + +void Out(IOutputStream& out, const DeleteObjectsResult& result) { + OutResult(out, result, "DeleteObjectsResult", {}); +} + +void Out(IOutputStream& out, const DeleteObjectsOutcome& outcome) { + OutOutcome(out, outcome); +} + void Out(IOutputStream& out, const CreateMultipartUploadRequest& request) { using T = CreateMultipartUploadRequest; OutRequest(out, request, {&Bucket<T>, &Key<T>}); diff --git a/ydb/core/wrappers/events/s3_out.h b/ydb/core/wrappers/events/s3_out.h index e9979b39b48..5dc7a29cab8 100644 --- a/ydb/core/wrappers/events/s3_out.h +++ b/ydb/core/wrappers/events/s3_out.h @@ -8,6 +8,7 @@ #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/PutObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectRequest.h> +#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectsRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/UploadPartRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Client.h> @@ -36,6 +37,10 @@ void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectRequest& request) void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectResult& result); void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectOutcome& outcome); +void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectsRequest& request); +void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectsResult& result); +void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectsOutcome& outcome); + void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadRequest& request); void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadResult& result); void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadOutcome& outcome); @@ -85,6 +90,18 @@ Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::ListObjectsOutcome, out, value) { NKikimr::NWrappers::Out(out, value); } +Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectsRequest, out, value) { + NKikimr::NWrappers::Out(out, value); +} + +Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectsResult, out, value) { + NKikimr::NWrappers::Out(out, value); +} + +Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectsOutcome, out, value) { + NKikimr::NWrappers::Out(out, value); +} + Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::HeadObjectRequest, out, value) { NKikimr::NWrappers::Out(out, value); } diff --git a/ydb/core/wrappers/fake_storage.cpp b/ydb/core/wrappers/fake_storage.cpp index d90aacf674a..147306aeb1b 100644 --- a/ydb/core/wrappers/fake_storage.cpp +++ b/ydb/core/wrappers/fake_storage.cpp @@ -48,27 +48,22 @@ void TFakeExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const { auto& awsPrefix = ev->Get()->GetRequest().GetPrefix(); const TString prefix(awsPrefix.data(), awsPrefix.size()); THolder<TEvListObjectsResponse> result; - if (!prefix) { - TEvListObjectsResponse::TOutcome awsOutcome; - result = MakeHolder<TEvListObjectsResponse>(awsOutcome); - TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); - } else { - TGuard<TMutex> g(Mutex); - TEvListObjectsResponse::TResult awsResult; - for (auto&& i : Data) { - if (!i.first.StartsWith(prefix)) { - continue; - } else { - Aws::S3::Model::Object objectMeta; - objectMeta.SetKey(i.first); - objectMeta.SetSize(i.second.size()); - awsResult.AddContents(std::move(objectMeta)); - } + TGuard<TMutex> g(Mutex); + TEvListObjectsResponse::TResult awsResult; + for (auto&& i : Data) { + if (!!prefix && !i.first.StartsWith(prefix)) { + continue; } - TEvListObjectsResponse::TOutcome awsOutcome(awsResult); - result = MakeHolder<TEvListObjectsResponse>(awsOutcome); - TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); + Aws::S3::Model::Object objectMeta; + objectMeta.SetKey(i.first); + objectMeta.SetSize(i.second.size()); + awsResult.AddContents(std::move(objectMeta)); + break; } + awsResult.SetIsTruncated(Data.size() > 1); + TEvListObjectsResponse::TOutcome awsOutcome(awsResult); + result = MakeHolder<TEvListObjectsResponse>(awsOutcome); + TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); } void TFakeExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const { @@ -149,6 +144,21 @@ void TFakeExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const { TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); } +void TFakeExternalStorage::Execute(TEvDeleteObjectsRequest::TPtr& ev) const { + TGuard<TMutex> g(Mutex); + Aws::S3::Model::DeleteObjectsResult awsResult; + for (auto&& awsKey : ev->Get()->GetRequest().GetDelete().GetObjects()) { + const TString key(awsKey.GetKey().data(), awsKey.GetKey().size()); + Data.erase(key); + Aws::S3::Model::DeletedObject dObject; + dObject.WithKey(key); + awsResult.AddDeleted(std::move(dObject)); + } + + THolder<TEvDeleteObjectsResponse> result(new TEvDeleteObjectsResponse(awsResult)); + TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); +} + void TFakeExternalStorage::Execute(TEvCreateMultipartUploadRequest::TPtr& /*ev*/) const { } diff --git a/ydb/core/wrappers/fake_storage.h b/ydb/core/wrappers/fake_storage.h index fcf50f29a23..d14a0606b8e 100644 --- a/ydb/core/wrappers/fake_storage.h +++ b/ydb/core/wrappers/fake_storage.h @@ -19,11 +19,16 @@ private: mutable TMap<TString, TString> Data; public: TFakeExternalStorage() = default; + TMap<TString, TString> GetData() const { + TGuard<TMutex> g(Mutex); + return Data; + } void Execute(TEvListObjectsRequest::TPtr& ev) const; void Execute(TEvGetObjectRequest::TPtr& ev) const; void Execute(TEvHeadObjectRequest::TPtr& ev) const; void Execute(TEvPutObjectRequest::TPtr& ev) const; void Execute(TEvDeleteObjectRequest::TPtr& ev) const; + void Execute(TEvDeleteObjectsRequest::TPtr& ev) const; void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const; void Execute(TEvUploadPartRequest::TPtr& ev) const; void Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const; @@ -52,6 +57,9 @@ public: virtual void Execute(TEvDeleteObjectRequest::TPtr& ev) const override { Singleton<TFakeExternalStorage>()->Execute(ev); } + virtual void Execute(TEvDeleteObjectsRequest::TPtr& ev) const override { + Singleton<TFakeExternalStorage>()->Execute(ev); + } virtual void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const override { Singleton<TFakeExternalStorage>()->Execute(ev); } diff --git a/ydb/core/wrappers/s3_storage.cpp b/ydb/core/wrappers/s3_storage.cpp index 994bf81b492..86cb5a86f21 100644 --- a/ydb/core/wrappers/s3_storage.cpp +++ b/ydb/core/wrappers/s3_storage.cpp @@ -98,6 +98,21 @@ public: }; template <> +class TContextBase<TEvDeleteObjectsRequest, TEvDeleteObjectsResponse> + : public TCommonContextBase<TEvDeleteObjectsRequest, TEvDeleteObjectsResponse> { +private: + using TBase = TCommonContextBase<TEvDeleteObjectsRequest, TEvDeleteObjectsResponse>; +public: + using TBase::Send; + using TBase::TBase; + void Reply(const typename TBase::TRequest& /*request*/, const typename TBase::TOutcome& outcome) const { + Y_VERIFY(!std::exchange(TBase::Replied, true), "Double-reply"); + + Send(MakeHolder<TEvDeleteObjectsResponse>(outcome).Release()); + } +}; + +template <> class TContextBase<TEvCheckObjectExistsRequest, TEvCheckObjectExistsResponse> : public TCommonContextBase<TEvCheckObjectExistsRequest, TEvCheckObjectExistsResponse> { private: @@ -261,6 +276,11 @@ void TS3ExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const { ev, &S3Client::DeleteObjectAsync); } +void TS3ExternalStorage::Execute(TEvDeleteObjectsRequest::TPtr& ev) const { + Call<TEvDeleteObjectsRequest, TEvDeleteObjectsResponse, TContextBase>( + ev, &S3Client::DeleteObjectsAsync); +} + void TS3ExternalStorage::Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const { Call<TEvCreateMultipartUploadRequest, TEvCreateMultipartUploadResponse, TContextBase>( ev, &S3Client::CreateMultipartUploadAsync); diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h index b09229f16c4..11009c2171a 100644 --- a/ydb/core/wrappers/s3_storage.h +++ b/ydb/core/wrappers/s3_storage.h @@ -78,6 +78,7 @@ public: virtual void Execute(TEvHeadObjectRequest::TPtr& ev) const override; virtual void Execute(TEvPutObjectRequest::TPtr& ev) const override; virtual void Execute(TEvDeleteObjectRequest::TPtr& ev) const override; + virtual void Execute(TEvDeleteObjectsRequest::TPtr& ev) const override; virtual void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const override; virtual void Execute(TEvUploadPartRequest::TPtr& ev) const override; virtual void Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const override; diff --git a/ydb/core/wrappers/s3_wrapper.cpp b/ydb/core/wrappers/s3_wrapper.cpp index 476d35a862a..42474eb605f 100644 --- a/ydb/core/wrappers/s3_wrapper.cpp +++ b/ydb/core/wrappers/s3_wrapper.cpp @@ -23,6 +23,10 @@ namespace NKikimr::NWrappers { namespace NExternalStorage { class TS3Wrapper: public TActor<TS3Wrapper> { + void Handle(TEvListObjectsRequest::TPtr& ev) { + CSOperator->Execute(ev); + } + void Handle(TEvGetObjectRequest::TPtr& ev) { CSOperator->Execute(ev); } @@ -43,6 +47,10 @@ class TS3Wrapper: public TActor<TS3Wrapper> { CSOperator->Execute(ev); } + void Handle(TEvDeleteObjectsRequest::TPtr& ev) { + CSOperator->Execute(ev); + } + void Handle(TEvCreateMultipartUploadRequest::TPtr& ev) { CSOperator->Execute(ev); } @@ -73,17 +81,18 @@ public: STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { + hFunc(TEvListObjectsRequest, Handle); hFunc(TEvGetObjectRequest, Handle); hFunc(TEvHeadObjectRequest, Handle); hFunc(TEvPutObjectRequest, Handle); hFunc(TEvDeleteObjectRequest, Handle); + hFunc(TEvDeleteObjectsRequest, Handle); hFunc(TEvCreateMultipartUploadRequest, Handle); hFunc(TEvUploadPartRequest, Handle); hFunc(TEvCompleteMultipartUploadRequest, Handle); hFunc(TEvAbortMultipartUploadRequest, Handle); hFunc(TEvCheckObjectExistsRequest, Handle); - cFunc(TEvents::TEvPoison::EventType, PassAway); } } |