diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-07 13:26:35 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-07 13:26:35 +0300 |
commit | 33e0f47c0e8e26c4778d83646252b59c717a4c7b (patch) | |
tree | 0407e2db48571e7f60d427e9fa1fe357fe932a68 | |
parent | f570e9e1e98392e10ddfe79909f0ad949f9dec8f (diff) | |
download | ydb-33e0f47c0e8e26c4778d83646252b59c717a4c7b.tar.gz |
bucket is property for external storage config->external storage
improve s3 tests usage
use special subscriber id for same mailbox problem resolving in tiering
30 files changed, 372 insertions, 325 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index e0d00c47671..1b3782b5d47 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -150,6 +150,8 @@ struct TKikimrEvents : TEvents { ES_DATASHARD_LOAD, ES_METADATA_PROVIDER, ES_INTERNAL_REQUEST, + ES_BACKGROUND_TASKS, + ES_TIERING }; }; diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 4dd801466b4..c01581a8bb6 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -331,6 +331,12 @@ enum EServiceKikimr { // MetadataProvider METADATA_PROVIDER = 1500; + // Tiering + TX_TIERING = 1600; + + // Background tasks + BG_TASKS = 1700; + }; message TActivity { diff --git a/ydb/core/tx/columnshard/blob.cpp b/ydb/core/tx/columnshard/blob.cpp index c7454238bfe..b0771a75f0f 100644 --- a/ydb/core/tx/columnshard/blob.cpp +++ b/ydb/core/tx/columnshard/blob.cpp @@ -132,23 +132,15 @@ TUnifiedBlobId ParseSmallBlobId(const TString& s, TString& error) { return TUnifiedBlobId(tabletId, gen, step, cookie, size); } -// Format: "S3_key|bucket" +// Format: "s = S3_key" TUnifiedBlobId ParseS3BlobId(const TString& s, TString& error) { - TVector<TString> keyBucket; - Split(s, "|", keyBucket); - - if (keyBucket.size() != 2) { - error = TStringBuilder() << "Wrong S3 id '" << s << "'"; - return TUnifiedBlobId(); - } - ui64 pathId; - TUnifiedBlobId dsBlobId = S3KeyToDsId(keyBucket[0], error, pathId); + TUnifiedBlobId dsBlobId = S3KeyToDsId(s, error, pathId); if (!dsBlobId.IsValid()) { return TUnifiedBlobId(); } - return TUnifiedBlobId(dsBlobId, TUnifiedBlobId::S3_BLOB, keyBucket[1], pathId); + return TUnifiedBlobId(dsBlobId, TUnifiedBlobId::S3_BLOB, pathId); } } diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h index 00228e7071c..c0f0e723531 100644 --- a/ydb/core/tx/columnshard/blob.h +++ b/ydb/core/tx/columnshard/blob.h @@ -87,13 +87,11 @@ class TUnifiedBlobId { struct TS3BlobId { TDsBlobId DsBlobId; - TString Bucket; TString Key; TS3BlobId() = default; - TS3BlobId(const TUnifiedBlobId& dsBlob, const TString& bucket, const ui64 pathId) - : Bucket(bucket) + TS3BlobId(const TUnifiedBlobId& dsBlob, const ui64 pathId) { Y_VERIFY(dsBlob.IsDsBlob()); DsBlobId = std::get<TDsBlobId>(dsBlob.Id); @@ -101,15 +99,15 @@ class TUnifiedBlobId { } bool operator == (const TS3BlobId& other) const { - return Bucket == other.Bucket && Key == other.Key; + return Key == other.Key; } TString ToStringNew() const { - return Sprintf("%s|%s", Key.c_str(), Bucket.c_str()); + return Sprintf("%s", Key.c_str()); } ui64 Hash() const { - return CombineHashes<ui64>(THash<TString>()(Bucket), THash<TString>()(Key)); + return IntHash(THash<TString>()(Key)); } }; @@ -143,8 +141,8 @@ public: {} // Make S3 blob Id from DS one - TUnifiedBlobId(const TUnifiedBlobId& blob, EBlobType type, const TString& bucket, const ui64 pathId) - : Id(TS3BlobId(blob, bucket, pathId)) + TUnifiedBlobId(const TUnifiedBlobId& blob, EBlobType type, const ui64 pathId) + : Id(TS3BlobId(blob, pathId)) { Y_VERIFY(type == S3_BLOB); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 6b672049d45..0cbfb6079ee 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -5,6 +5,7 @@ #include <ydb/core/tx/tiering/external_data.h> #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> #include <ydb/services/metadata/service.h> +#include <ydb/core/tx/tiering/manager.h> namespace NKikimr::NColumnShard { @@ -951,7 +952,7 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl EnableTiering = schema.GetEnableTiering(); if (OwnerPath && !Tiers && EnableTiering) { - Tiers = std::make_shared<TTiersManager>(TabletID(), OwnerPath); + Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), OwnerPath); } if (!!Tiers) { if (EnableTiering) { @@ -1044,7 +1045,7 @@ TActorId TColumnShard::GetS3ActorForTier(const TString& tierName) const { void TColumnShard::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { if (!Tiers) { - Tiers = std::make_shared<TTiersManager>(TabletID(), OwnerPath); + Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), OwnerPath); } Tiers->TakeConfigs(ev->Get()->GetSnapshot()); if (EnableTiering) { diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index 9cf3766c597..7dd77b12ed2 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -595,7 +595,7 @@ void TestTwoHotTiers(bool reboot) { } void TestHotAndColdTiers(bool reboot) { - TString bucket = "ydb"; + const TString bucket = "tiering-test-01"; TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -612,9 +612,10 @@ void TestHotAndColdTiers(bool reboot) { s3Config.SetScheme(NKikimrSchemeOp::TS3Settings::HTTP); s3Config.SetVerifySSL(false); -#if 0 + s3Config.SetBucket(bucket); +//#define S3_TEST_USAGE +#ifdef S3_TEST_USAGE s3Config.SetEndpoint("storage.cloud-preprod.yandex.net"); - s3Config.SetBucket("tiering-test-01"); s3Config.SetAccessKey("..."); s3Config.SetSecretKey("..."); s3Config.SetProxyHost("localhost"); @@ -622,7 +623,6 @@ void TestHotAndColdTiers(bool reboot) { s3Config.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP); #else s3Config.SetEndpoint("fake"); - s3Config.SetBucket(bucket); #endif s3Config.SetRequestTimeoutMs(10000); s3Config.SetHttpRequestTimeoutMs(10000); diff --git a/ydb/core/tx/datashard/export_s3_base_uploader.h b/ydb/core/tx/datashard/export_s3_base_uploader.h index 1acce848bc4..8ebc1a79ffc 100644 --- a/ydb/core/tx/datashard/export_s3_base_uploader.h +++ b/ydb/core/tx/datashard/export_s3_base_uploader.h @@ -79,7 +79,6 @@ protected: google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); auto request = Aws::S3::Model::PutObjectRequest() - .WithBucket(Settings.GetBucket()) .WithKey(Settings.GetSchemeKey()) .WithStorageClass(Settings.GetStorageClass()); this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); @@ -145,7 +144,6 @@ protected: void UploadData() { if (!MultiPart) { auto request = Aws::S3::Model::PutObjectRequest() - .WithBucket(Settings.GetBucket()) .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) .WithStorageClass(Settings.GetStorageClass()); this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); @@ -156,7 +154,6 @@ protected: } auto request = Aws::S3::Model::UploadPartRequest() - .WithBucket(Settings.GetBucket()) .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) .WithUploadId(*UploadId) .WithPartNumber(Parts.size() + 1); @@ -187,7 +184,6 @@ protected: if (!upload) { auto request = Aws::S3::Model::CreateMultipartUploadRequest() - .WithBucket(Settings.GetBucket()) .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) .WithStorageClass(Settings.GetStorageClass()); this->Send(Client, new TEvExternalStorage::TEvCreateMultipartUploadRequest(request)); @@ -209,7 +205,6 @@ protected: } auto request = Aws::S3::Model::CompleteMultipartUploadRequest() - .WithBucket(Settings.GetBucket()) .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) .WithUploadId(*UploadId) .WithMultipartUpload(Aws::S3::Model::CompletedMultipartUpload().WithParts(std::move(parts))); @@ -224,7 +219,6 @@ protected: } auto request = Aws::S3::Model::AbortMultipartUploadRequest() - .WithBucket(Settings.GetBucket()) .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) .WithUploadId(*UploadId); this->Send(Client, new TEvExternalStorage::TEvAbortMultipartUploadRequest(request)); diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index ba4f23c09d0..cb91713f93a 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -357,7 +357,6 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { << ": key# " << key); auto request = Model::HeadObjectRequest() - .WithBucket(Settings.GetBucket()) .WithKey(key); Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request)); @@ -369,7 +368,6 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { << ", range# " << range.first << "-" << range.second); auto request = Model::GetObjectRequest() - .WithBucket(Settings.GetBucket()) .WithKey(key) .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp index a95794cdcf9..5819f9f5567 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp @@ -29,7 +29,6 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { void HeadObject(const TString& key) { auto request = Model::HeadObjectRequest() - .WithBucket(ImportInfo->Settings.bucket()) .WithKey(key); Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request)); @@ -52,7 +51,6 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { void GetObject(const TString& key, const std::pair<ui64, ui64>& range) { auto request = Model::GetObjectRequest() - .WithBucket(ImportInfo->Settings.bucket()) .WithKey(key) .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); @@ -137,7 +135,6 @@ public: if (Client) { Send(Client, new TEvents::TEvPoisonPill()); } - Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); HeadObject(SchemeKey); diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp index 114b634406a..a9cd24a2c75 100644 --- a/ydb/core/tx/tiering/manager.cpp +++ b/ydb/core/tx/tiering/manager.cpp @@ -30,7 +30,7 @@ public: } void Bootstrap() { Become(&TThis::StateMain); - Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation())); + Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation(), SelfId())); } void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { auto snapshot = ev->Get()->GetSnapshot(); @@ -79,7 +79,7 @@ bool TManager::Start() { #ifndef KIKIMR_DISABLE_S3_OPS auto& ctx = TActivationContext::AsActorContext(); const NActors::TActorId newActor = ctx.Register( - CreateS3Actor(TabletId, ctx.SelfID, Config.GetTierName()) + CreateS3Actor(TabletId, TabletActorId, Config.GetTierName()) ); ctx.Send(newActor, new TEvPrivate::TEvS3Settings(Config.GetProtoConfig().GetObjectStorage())); Stop(); @@ -88,9 +88,11 @@ bool TManager::Start() { return true; } -TManager::TManager(const ui64 tabletId, const TTierConfig& config) +TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TTierConfig& config) : TabletId(tabletId) - , Config(config) { + , TabletActorId(tabletActorId) + , Config(config) +{ } NOlap::TStorageTier TManager::BuildTierStorage() const { @@ -147,7 +149,7 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt) if (Managers.contains(i.second.GetTierName())) { continue; } - NTiers::TManager localManager(TabletId, i.second); + NTiers::TManager localManager(TabletId, TabletActorId, i.second); auto& manager = Managers.emplace(i.second.GetTierName(), std::move(localManager)).first->second; if (IsActive()) { manager.Start(); @@ -158,12 +160,12 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt) TActorId TTiersManager::GetStorageActorId(const TString& tierName) { auto it = Managers.find(tierName); if (it == Managers.end()) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "No S3 actor for tier '" << tierName << "' at tablet " << TabletId; + ALS_ERROR(NKikimrServices::TX_TIERING) << "No S3 actor for tier '" << tierName << "' at tablet " << TabletId; return {}; } auto actorId = it->second.GetStorageActorId(); if (!actorId) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "Not started storage actor for tier '" << tierName << "' at tablet " << TabletId; + ALS_ERROR(NKikimrServices::TX_TIERING) << "Not started storage actor for tier '" << tierName << "' at tablet " << TabletId; return {}; } return actorId; @@ -234,4 +236,12 @@ TTiersManager::~TTiersManager() { } } +TActorId TTiersManager::GetActorId() const { + if (Actor) { + return Actor->SelfId(); + } else { + return {}; + } +} + } diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h index 4313aefce42..008682a8917 100644 --- a/ydb/core/tx/tiering/manager.h +++ b/ydb/core/tx/tiering/manager.h @@ -13,10 +13,11 @@ namespace NTiers { class TManager { private: ui64 TabletId = 0; + YDB_READONLY_DEF(NActors::TActorId, TabletActorId); YDB_READONLY_DEF(TTierConfig, Config); YDB_READONLY_DEF(NActors::TActorId, StorageActorId); public: - TManager(const ui64 tabletId, const TTierConfig& config); + TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TTierConfig& config); static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression); NOlap::TStorageTier BuildTierStorage() const; @@ -34,6 +35,7 @@ private: class TActor; using TManagers = TMap<TString, NTiers::TManager>; ui64 TabletId = 0; + const TActorId TabletActorId; TString OwnerPath; TActor* Actor = nullptr; YDB_READONLY_DEF(TManagers, Managers); @@ -43,11 +45,13 @@ private: mutable NMetadataProvider::ISnapshotParser::TPtr ExternalDataManipulation; public: - TTiersManager(const ui64 tabletId, const TString& ownerPath) + TTiersManager(const ui64 tabletId, const TActorId& tabletActorId, const TString& ownerPath) : TabletId(tabletId) + , TabletActorId(tabletActorId) , OwnerPath(ownerPath) { } + TActorId GetActorId() const; ~TTiersManager(); THashMap<ui64, NOlap::TTiersInfo> GetTiering() const; void TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshot); diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp index d9136104748..5dd253607e3 100644 --- a/ydb/core/tx/tiering/s3_actor.cpp +++ b/ydb/core/tx/tiering/s3_actor.cpp @@ -32,8 +32,8 @@ public: return Event->Blobs; } - TUnifiedBlobId AddExported(const TString& bucket, const TUnifiedBlobId& srcBlob, const ui64 pathId) { - Event->SrcToDstBlobs[srcBlob] = TUnifiedBlobId(srcBlob, TUnifiedBlobId::S3_BLOB, bucket, pathId); + TUnifiedBlobId AddExported(const TUnifiedBlobId& srcBlob, const ui64 pathId) { + Event->SrcToDstBlobs[srcBlob] = TUnifiedBlobId(srcBlob, TUnifiedBlobId::S3_BLOB, pathId); return Event->SrcToDstBlobs[srcBlob]; } @@ -74,9 +74,9 @@ public: return NKikimrServices::TActivity::TX_COLUMNSHARD_S3_ACTOR; } - TS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName) + TS3Actor(ui64 tabletId, const TActorId& shardActor, const TString& tierName) : TabletId(tabletId) - , ShardActor(parent) + , ShardActor(shardActor) , TierName(tierName) {} @@ -88,16 +88,16 @@ public: void Handle(TEvPrivate::TEvS3Settings::TPtr& ev) { auto& msg = *ev->Get(); auto& endpoint = msg.Settings.GetEndpoint(); - Bucket = msg.Settings.GetBucket(); + const auto& bucket = msg.Settings.GetBucket(); LOG_S_DEBUG("[S3] Update settings for tier '" << TierName << "' endpoint '" << endpoint - << "' bucket '" << Bucket << "' at tablet " << TabletId); + << "' bucket '" << bucket << "' at tablet " << TabletId); if (endpoint.empty()) { LOG_S_ERROR("[S3] No endpoint in settings for tier '" << TierName << "' at tablet " << TabletId); return; } - if (Bucket.empty()) { + if (bucket.empty()) { LOG_S_ERROR("[S3] No bucket in settings for tier '" << TierName << "' at tablet " << TabletId); return; } @@ -113,13 +113,14 @@ public: void Handle(TEvPrivate::TEvExport::TPtr& ev) { auto& msg = *ev->Get(); ui64 exportNo = msg.ExportNo; + Y_VERIFY(ev->Get()->DstActor == ShardActor); Y_VERIFY(!Exports.count(exportNo)); Exports[exportNo] = TS3Export(ev->Release()); auto& ex = Exports[exportNo]; for (auto& [blobId, blob] : ex.Blobs()) { - TString key = ex.AddExported(Bucket, blobId, blob.PathId).GetS3Key(); + TString key = ex.AddExported(blobId, blob.PathId).GetS3Key(); Y_VERIFY(!ExportingKeys.count(key)); // TODO ex.RegisterKey(key); @@ -221,7 +222,11 @@ public: if (!context) { return; } - if (!msg.IsExists()) { + const auto& resultOutcome = msg.Result; + + if (!resultOutcome.IsSuccess()) { + KeyFinished(context->GetKey(), true, LogError("CheckObjectExistsResponse", resultOutcome.GetError(), !!context->GetKey())); + } else if (!msg.IsExists()) { SendPutObject(context->GetKey(), std::move(context->DetachData())); } else { KeyFinished(context->GetKey(), false, ""); @@ -348,7 +353,6 @@ public: auto& ex = it->second; ex.FinishKey(key); - Y_VERIFY(ex.Event->DstActor == ShardActor); if (hasError) { ex.Event->Status = NKikimrProto::ERROR; @@ -370,7 +374,6 @@ private: ui64 TabletId; TActorId ShardActor; TString TierName; - TString Bucket; ui64 ForgetNo{}; THashMap<ui64, TS3Export> Exports; THashMap<ui64, TS3Forget> Forgets; @@ -412,7 +415,6 @@ private: void SendPutObject(const TString& key, TString&& data) const { auto request = Aws::S3::Model::PutObjectRequest() - .WithBucket(Bucket) .WithKey(key) .WithStorageClass(Aws::S3::Model::StorageClass::STANDARD_IA); #if 0 @@ -425,18 +427,16 @@ private: } void SendPutObjectIfNotExists(const TString& key, TString&& data) { - auto request = Aws::S3::Model::HeadObjectRequest() - .WithBucket(Bucket) - .WithKey(key); + auto request = Aws::S3::Model::ListObjectsRequest() + .WithPrefix(key); - LOG_S_DEBUG("[S3] HeadObjectRequest key '" << key << "' at tablet " << TabletId); + LOG_S_DEBUG("[S3] PutObjectIfNotExists->ListObjectsRequest key '" << key << "' at tablet " << TabletId); std::shared_ptr<TEvCheckObjectExistsRequestContext> context = std::make_shared<TEvCheckObjectExistsRequestContext>(key, std::move(data)); Send(ExternalStorageActorId, new TEvExternalStorage::TEvCheckObjectExistsRequest(request, context)); } void SendHeadObject(const TString& key) const { auto request = Aws::S3::Model::HeadObjectRequest() - .WithBucket(Bucket) .WithKey(key); LOG_S_DEBUG("[S3] HeadObjectRequest key '" << key << "' at tablet " << TabletId); @@ -446,7 +446,6 @@ private: void SendGetObject(const TString& key, const ui32 startPos, const ui32 size) { Y_VERIFY(size); auto request = Aws::S3::Model::GetObjectRequest() - .WithBucket(Bucket) .WithKey(key) .WithRange(TStringBuilder() << "bytes=" << startPos << "-" << startPos + size - 1); @@ -456,7 +455,6 @@ private: void SendDeleteObject(const TString& key) const { auto request = Aws::S3::Model::DeleteObjectRequest() - .WithBucket(Bucket) .WithKey(key); Send(ExternalStorageActorId, new TEvExternalStorage::TEvDeleteObjectRequest(request)); diff --git a/ydb/core/wrappers/abstract.cpp b/ydb/core/wrappers/abstract.cpp index be0fde71922..13c4f1bdddc 100644 --- a/ydb/core/wrappers/abstract.cpp +++ b/ydb/core/wrappers/abstract.cpp @@ -12,7 +12,7 @@ IExternalStorageOperator::TPtr IExternalStorageConfig::ConstructStorageOperator( IExternalStorageConfig::TPtr IExternalStorageConfig::Construct(const NKikimrSchemeOp::TS3Settings& settings) { if (settings.GetEndpoint() == "fake") { - return std::make_shared<TFakeExternalStorageConfig>(); + return std::make_shared<TFakeExternalStorageConfig>(settings.GetBucket()); } else { return std::make_shared <TS3ExternalStorageConfig>(settings); } diff --git a/ydb/core/wrappers/events/common.h b/ydb/core/wrappers/events/common.h index e051437533c..632e63b2386 100644 --- a/ydb/core/wrappers/events/common.h +++ b/ydb/core/wrappers/events/common.h @@ -13,20 +13,33 @@ namespace NKikimr::NWrappers::NExternalStorage { template <typename TDerived, ui32 EventType, typename T> struct TGenericRequest: public NActors::TEventLocal<TDerived, EventType> { +private: + IRequestContext::TPtr RequestContext; +public: using TRequest = T; TRequest Request; IRequestContext::TPtr GetRequestContext() const { - return nullptr; + return RequestContext; } const TRequest& GetRequest() const { return Request; } + TRequest& MutableRequest() { + return Request; + } + explicit TGenericRequest(const TRequest& request) : Request(request) { } + TGenericRequest(const TRequest& request, IRequestContext::TPtr requestContext) + : RequestContext(requestContext) + , Request(request) + { + } + TString ToString() const override { return TStringBuilder() << this->ToStringHeader() << " {" << " Request: " << Request @@ -57,47 +70,100 @@ struct TRequestWithBody: public TGenericRequest<TDerived, EventType, T> { using TBase = TRequestWithBody<TDerived, EventType, T>; }; -template <typename TDerived, ui32 EventType, typename T, typename U = T> -struct TGenericResponse: public NActors::TEventLocal<TDerived, EventType> { - using TOutcome = Aws::Utils::Outcome<T, Aws::S3::S3Error>; +template <typename TDerived, ui32 EventType, typename TAWSResultExt, typename U = TAWSResultExt> +struct TBaseGenericResponse: public NActors::TEventLocal<TDerived, EventType> { +private: + using TBase = NActors::TEventLocal<TDerived, EventType>; + IRequestContext::TPtr RequestContext; +public: + using TOutcome = Aws::Utils::Outcome<TAWSResultExt, Aws::S3::S3Error>; using TResult = Aws::Utils::Outcome<U, Aws::S3::S3Error>; + using TAWSResult = U; + using TAWSOutcome = TResult; using TKey = std::optional<TString>; - TKey Key; TResult Result; - explicit TGenericResponse(const TKey& key, const TOutcome& outcome) - : Key(key) + explicit TBaseGenericResponse(const TOutcome& outcome) + : Result(TDerived::ResultFromOutcome(outcome)) { + } + + TBaseGenericResponse(const TOutcome& outcome, IRequestContext::TPtr requestContext) + : RequestContext(requestContext) , Result(TDerived::ResultFromOutcome(outcome)) { } + bool IsSuccess() const { + return Result.IsSuccess(); + } + const Aws::S3::S3Error& GetError() const { + return Result.GetError(); + } + const U& GetResult() const { + return Result.GetResult(); + } + + template <class T> + std::shared_ptr<T> GetRequestContextAs() const { + return dynamic_pointer_cast<T>(RequestContext); + } + static TResult ResultFromOutcome(const TOutcome& outcome) { return outcome; } TString ToString() const override { return TStringBuilder() << this->ToStringHeader() << " {" - << " Key: " << (Key ? "null" : *Key) << " Result: " << Result << " }"; } +}; - using TBase = TGenericResponse<TDerived, EventType, T, U>; +template <typename TDerived, ui32 EventType, typename TAWSResult, typename U = TAWSResult> +struct TGenericResponse: public TBaseGenericResponse<TDerived, EventType, TAWSResult, U> { +private: + using TBase = TBaseGenericResponse<TDerived, EventType, TAWSResult, U>; +public: + using TOutcome = typename TBase::TOutcome; + using TResult = typename TBase::TResult; + using TKey = std::optional<TString>; + + TKey Key; + + TGenericResponse(const TKey& key, const TOutcome& outcome) + : TBase(outcome) + , Key(key) { + } + + TGenericResponse(const TKey& key, const TOutcome& outcome, IRequestContext::TPtr requestContext) + : TBase(outcome, requestContext) + , Key(key) + { + } + + TString ToString() const override { + return TStringBuilder() << this->ToStringHeader() << " {" + << " Key: " << (Key ? "null" : *Key) + << " Result: " << TBase::Result + << " }"; + } }; template <typename TDerived, ui32 EventType, typename T, typename U> struct TResponseWithBody: public TGenericResponse<TDerived, EventType, T, U> { - using TGeneric = TGenericResponse<TDerived, EventType, T, U>; - using TKey = typename TGeneric::TKey; +private: + using TBase = TGenericResponse<TDerived, EventType, T, U>; +public: + using TKey = typename TBase::TKey; TString Body; - explicit TResponseWithBody(const TKey& key, const typename TGeneric::TOutcome& outcome) - : TGeneric(key, outcome) { + explicit TResponseWithBody(const TKey& key, const typename TBase::TOutcome& outcome) + : TBase(key, outcome) { } - explicit TResponseWithBody(const TKey& key, const typename TGeneric::TOutcome& outcome, TString&& body) - : TGeneric(key, outcome) + explicit TResponseWithBody(const TKey& key, const typename TBase::TOutcome& outcome, TString&& body) + : TBase(key, outcome) , Body(std::move(body)) { } @@ -108,8 +174,6 @@ struct TResponseWithBody: public TGenericResponse<TDerived, EventType, T, U> { << " Body: " << Body.size() << "b" << " }"; } - - using TBase = TResponseWithBody<TDerived, EventType, T, U>; }; #define DEFINE_REQUEST(name, base) \ @@ -121,14 +185,21 @@ struct TResponseWithBody: public TGenericResponse<TDerived, EventType, T, U> { DEFINE_REQUEST(name, TGenericRequest) #define DECLARE_GENERIC_RESPONSE(name) \ - struct TEv##name##Response: public TGenericResponse<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result> + struct TEv##name##Response: public TGenericResponse<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result> {\ + private:\ + using TBase = TGenericResponse<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result>;\ + public:\ + using TBase::TBase; #define DECLARE_RESPONSE_WITH_BODY(name, result_t) \ - struct TEv##name##Response: public TResponseWithBody<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result, result_t> + struct TEv##name##Response: public TResponseWithBody<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result, result_t> {\ + private:\ + using TBase = TResponseWithBody<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result, result_t>;\ + public:\ + using TBase::TBase; #define DEFINE_GENERIC_RESPONSE(name) \ - DECLARE_GENERIC_RESPONSE(name) { \ - using TBase::TBase; \ + DECLARE_GENERIC_RESPONSE(name) \ } #define DEFINE_GENERIC_REQUEST_RESPONSE(name) \ @@ -136,7 +207,7 @@ struct TResponseWithBody: public TGenericResponse<TDerived, EventType, T, U> { DEFINE_GENERIC_RESPONSE(name) DEFINE_GENERIC_REQUEST(GetObject); -DECLARE_RESPONSE_WITH_BODY(GetObject, Aws::String) { +DECLARE_RESPONSE_WITH_BODY(GetObject, Aws::String) static TResult ResultFromOutcome(const TOutcome & outcome) { if (outcome.IsSuccess()) { return outcome.GetResult().GetETag(); @@ -144,8 +215,6 @@ DECLARE_RESPONSE_WITH_BODY(GetObject, Aws::String) { return outcome.GetError(); } } - - using TBase::TBase; }; DEFINE_REQUEST(PutObject, TRequestWithBody); diff --git a/ydb/core/wrappers/events/delete_objects.h b/ydb/core/wrappers/events/delete_objects.h index f4da8e49bbd..f8c6945fb69 100644 --- a/ydb/core/wrappers/events/delete_objects.h +++ b/ydb/core/wrappers/events/delete_objects.h @@ -13,51 +13,18 @@ namespace NKikimr::NWrappers::NExternalStorage { - class TEvDeleteObjectsRequest: public TEventLocal<TEvDeleteObjectsRequest, EvDeleteObjectsRequest> { - public: - using TRequest = Aws::S3::Model::DeleteObjectsRequest; + class TEvDeleteObjectsRequest: public TGenericRequest<TEvDeleteObjectsRequest, EvDeleteObjectsRequest, Aws::S3::Model::DeleteObjectsRequest> { private: - TRequest Request; + using TBase = TGenericRequest<TEvDeleteObjectsRequest, EvDeleteObjectsRequest, Aws::S3::Model::DeleteObjectsRequest>; public: - TEvDeleteObjectsRequest(const TRequest& request) - : Request(request) { - - } - IRequestContext::TPtr GetRequestContext() const { - return nullptr; - } - const TRequest& GetRequest() const { - return Request; - } - TRequest* operator->() { - return &Request; - } + using TBase::TBase; }; - class TEvDeleteObjectsResponse: public TEventLocal<TEvDeleteObjectsResponse, EvDeleteObjectsResponse> { - public: - using TResult = Aws::S3::Model::DeleteObjectsResult; - using TOutcome = Aws::Utils::Outcome<TResult, Aws::S3::S3Error>; - using TKey = std::optional<TString>; + + class TEvDeleteObjectsResponse: public TBaseGenericResponse<TEvDeleteObjectsResponse, EvDeleteObjectsResponse, Aws::S3::Model::DeleteObjectsResult> { private: - TOutcome Outcome; + using TBase = TBaseGenericResponse<TEvDeleteObjectsResponse, EvDeleteObjectsResponse, Aws::S3::Model::DeleteObjectsResult>; public: - TEvDeleteObjectsResponse(const TOutcome& result) - : Outcome(result) - { - - } - bool IsSuccess() const { - return Outcome.IsSuccess(); - } - const Aws::S3::S3Error& GetError() const { - return Outcome.GetError(); - } - const TResult& GetResult() const { - return Outcome.GetResult(); - } - const TResult* operator->() const { - return &Outcome.GetResult(); - } + using TBase::TBase; }; } diff --git a/ydb/core/wrappers/events/list_objects.h b/ydb/core/wrappers/events/list_objects.h index 714296c5574..b2fa83aedee 100644 --- a/ydb/core/wrappers/events/list_objects.h +++ b/ydb/core/wrappers/events/list_objects.h @@ -13,48 +13,18 @@ namespace NKikimr::NWrappers::NExternalStorage { - class TEvListObjectsRequest: public TEventLocal<TEvListObjectsRequest, EvListObjectsRequest> { - public: - using TRequest = Aws::S3::Model::ListObjectsRequest; + class TEvListObjectsRequest: public TGenericRequest<TEvListObjectsRequest, EvListObjectsRequest, Aws::S3::Model::ListObjectsRequest> { private: - TRequest Request; + using TBase = TGenericRequest<TEvListObjectsRequest, EvListObjectsRequest, Aws::S3::Model::ListObjectsRequest>; public: - TEvListObjectsRequest(const TRequest& request) - : Request(request) { - - } - IRequestContext::TPtr GetRequestContext() const { - return nullptr; - } - const TRequest& GetRequest() const { - return Request; - } - TRequest* operator->() { - return &Request; - } + using TBase::TBase; }; - class TEvListObjectsResponse: public TEventLocal<TEvListObjectsResponse, EvListObjectsResponse> { - public: - using TResult = Aws::S3::Model::ListObjectsResult; - using TOutcome = Aws::Utils::Outcome<TResult, Aws::S3::S3Error>; - using TKey = std::optional<TString>; + + class TEvListObjectsResponse: public TBaseGenericResponse<TEvListObjectsResponse, EvListObjectsResponse, Aws::S3::Model::ListObjectsResult> { private: - TOutcome Outcome; + using TBase = TBaseGenericResponse<TEvListObjectsResponse, EvListObjectsResponse, Aws::S3::Model::ListObjectsResult>; public: - TEvListObjectsResponse(const TOutcome& result) - : Outcome(result) - { - - } - bool IsSuccess() const { - return Outcome.IsSuccess(); - } - const TResult& GetResult() const { - return Outcome.GetResult(); - } - const TResult* operator->() const { - return &Outcome.GetResult(); - } + using TBase::TBase; }; } diff --git a/ydb/core/wrappers/events/object_exists.h b/ydb/core/wrappers/events/object_exists.h index 2c6128e507a..d16a06143ef 100644 --- a/ydb/core/wrappers/events/object_exists.h +++ b/ydb/core/wrappers/events/object_exists.h @@ -4,6 +4,7 @@ #include <ydb/core/base/events.h> #include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/library/accessor/accessor.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectResult.h> @@ -12,50 +13,22 @@ namespace NKikimr::NWrappers::NExternalStorage { -class TEvCheckObjectExistsRequest: public TEventLocal<TEvCheckObjectExistsRequest, EvCheckObjectExistsRequest> { -public: - using TRequest = Aws::S3::Model::HeadObjectRequest; +class TEvCheckObjectExistsRequest: public TGenericRequest<TEvCheckObjectExistsRequest, + EvCheckObjectExistsRequest, Aws::S3::Model::ListObjectsRequest> { private: - TRequest Request; - IRequestContext::TPtr RequestContext; + using TBase = TGenericRequest<TEvCheckObjectExistsRequest, EvCheckObjectExistsRequest, Aws::S3::Model::ListObjectsRequest>; public: - TEvCheckObjectExistsRequest(const TRequest& request, IRequestContext::TPtr requestContext) - : Request(request) - , RequestContext(requestContext) { - - } - IRequestContext::TPtr GetRequestContext() const { - return RequestContext; - } - const TRequest& GetRequest() const { - return Request; - } - TRequest* operator->() { - return &Request; - } + using TBase::TBase; }; -class TEvCheckObjectExistsResponse: public TEventLocal<TEvCheckObjectExistsResponse, EvCheckObjectExistsResponse> { -public: - using TRequest = Aws::S3::Model::HeadObjectRequest; - using TResult = Aws::S3::Model::HeadObjectResult; - using TOutcome = Aws::Utils::Outcome<TResult, Aws::S3::S3Error>; - using TKey = std::optional<TString>; +class TEvCheckObjectExistsResponse: public TBaseGenericResponse<TEvCheckObjectExistsResponse, + EvCheckObjectExistsResponse, Aws::S3::Model::ListObjectsResult> { private: - IRequestContext::TPtr RequestContext; - const bool IsExistsFlag; + using TBase = TBaseGenericResponse<TEvCheckObjectExistsResponse, EvCheckObjectExistsResponse, Aws::S3::Model::ListObjectsResult>; public: - TEvCheckObjectExistsResponse(const TRequest& /*request*/, const TOutcome& outcome, IRequestContext::TPtr requestContext) - : RequestContext(requestContext) - , IsExistsFlag(outcome.IsSuccess() && !outcome.GetResult().GetDeleteMarker()) { - - } + using TBase::TBase; bool IsExists() const { - return IsExistsFlag; - } - template <class T> - std::shared_ptr<T> GetRequestContextAs() const { - return dynamic_pointer_cast<T>(RequestContext); + return Result.IsSuccess() && Result.GetResult().GetContents().size(); } }; } diff --git a/ydb/core/wrappers/fake_storage.cpp b/ydb/core/wrappers/fake_storage.cpp index 147306aeb1b..5db1ee5944f 100644 --- a/ydb/core/wrappers/fake_storage.cpp +++ b/ydb/core/wrappers/fake_storage.cpp @@ -44,13 +44,14 @@ static bool TryParseRange(const TString& str, std::pair<ui64, ui64>& range) { } } -void TFakeExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const { - auto& awsPrefix = ev->Get()->GetRequest().GetPrefix(); +TEvListObjectsResponse::TResult TFakeExternalStorage::BuildListObjectsResult(const TEvListObjectsRequest::TRequest& request) const { + auto& bucket = GetBucket(AwsToString(request.GetBucket())); + auto& awsPrefix = request.GetPrefix(); const TString prefix(awsPrefix.data(), awsPrefix.size()); THolder<TEvListObjectsResponse> result; TGuard<TMutex> g(Mutex); - TEvListObjectsResponse::TResult awsResult; - for (auto&& i : Data) { + TEvListObjectsResponse::TAWSResult awsResult; + for (auto&& i : bucket) { if (!!prefix && !i.first.StartsWith(prefix)) { continue; } @@ -60,31 +61,35 @@ void TFakeExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const { awsResult.AddContents(std::move(objectMeta)); break; } - awsResult.SetIsTruncated(Data.size() > 1); - TEvListObjectsResponse::TOutcome awsOutcome(awsResult); - result = MakeHolder<TEvListObjectsResponse>(awsOutcome); + awsResult.SetIsTruncated(bucket.GetSize() > 1); + return awsResult; +} + +void TFakeExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const { + auto awsResult = BuildListObjectsResult(ev->Get()->GetRequest()); + auto result = MakeHolder<TEvListObjectsResponse>(awsResult); TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); } void TFakeExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const { TGuard<TMutex> g(Mutex); - auto& awsKey = ev->Get()->Request.GetKey(); - const TString key(awsKey.data(), awsKey.size()); - auto it = Data.find(key); + auto& bucket = GetBucket(AwsToString(ev->Get()->GetRequest().GetBucket())); + const TString key = AwsToString(ev->Get()->GetRequest().GetKey()); + auto object = bucket.GetObject(key); TString data; - if (it != Data.end()) { + if (!!object) { Aws::S3::Model::GetObjectResult awsResult; - awsResult.SetETag(MD5::Calc(it->second)); - data = it->second; + awsResult.SetETag(MD5::Calc(*object)); + data = *object; - auto awsRange = ev->Get()->Request.GetRange(); + auto awsRange = ev->Get()->GetRequest().GetRange(); if (awsRange.size()) { const TString strRange(awsRange.data(), awsRange.size()); std::pair<ui64, ui64> range; if (!TryParseRange(strRange, range)) { Aws::Utils::Outcome<Aws::S3::Model::GetObjectResult, Aws::S3::S3Error> awsOutcome; - THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, awsOutcome, std::move(data))); + THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, std::move(awsOutcome), std::move(data))); TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); return; } else { @@ -93,41 +98,40 @@ void TFakeExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const { } Aws::Utils::Outcome<Aws::S3::Model::GetObjectResult, Aws::S3::S3Error> awsOutcome(std::move(awsResult)); - THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, awsOutcome, std::move(data))); + THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, std::move(awsOutcome), std::move(data))); TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); } else { Aws::Utils::Outcome<Aws::S3::Model::GetObjectResult, Aws::S3::S3Error> awsOutcome; - THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, awsOutcome, std::move(data))); + THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, std::move(awsOutcome), std::move(data))); TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); } } void TFakeExternalStorage::Execute(TEvHeadObjectRequest::TPtr& ev) const { TGuard<TMutex> g(Mutex); - auto& awsKey = ev->Get()->Request.GetKey(); - const TString key(awsKey.data(), awsKey.size()); - - auto it = Data.find(key); - if (it != Data.end()) { + auto& bucket = GetBucket(AwsToString(ev->Get()->GetRequest().GetBucket())); + const TString key = AwsToString(ev->Get()->GetRequest().GetKey()); + auto object = bucket.GetObject(key); + if (object) { Aws::S3::Model::HeadObjectResult awsResult; - awsResult.SetETag(MD5::Calc(it->second)); - awsResult.SetContentLength(it->second.size()); + awsResult.SetETag(MD5::Calc(*object)); + awsResult.SetContentLength(object->size()); Aws::Utils::Outcome<Aws::S3::Model::HeadObjectResult, Aws::S3::S3Error> awsOutcome(awsResult); - THolder<TEvHeadObjectResponse> result(new TEvHeadObjectResponse(key, awsOutcome)); + THolder<TEvHeadObjectResponse> result(new TEvHeadObjectResponse(key, std::move(awsOutcome))); TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); } else { Aws::Utils::Outcome<Aws::S3::Model::HeadObjectResult, Aws::S3::S3Error> awsOutcome; - THolder<TEvHeadObjectResponse> result(new TEvHeadObjectResponse(key, awsOutcome)); + THolder<TEvHeadObjectResponse> result(new TEvHeadObjectResponse(key, std::move(awsOutcome))); TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); } } void TFakeExternalStorage::Execute(TEvPutObjectRequest::TPtr& ev) const { TGuard<TMutex> g(Mutex); + const TString key = AwsToString(ev->Get()->GetRequest().GetKey()); + auto& bucket = MutableBucket(AwsToString(ev->Get()->GetRequest().GetBucket())); + bucket.PutObject(key, ev->Get()->Body); Aws::S3::Model::PutObjectResult awsResult; - auto& awsKey = ev->Get()->Request.GetKey(); - const TString key(awsKey.data(), awsKey.size()); - Data[key] = ev->Get()->Body; THolder<TEvPutObjectResponse> result(new TEvPutObjectResponse(key, awsResult)); TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); @@ -136,9 +140,9 @@ void TFakeExternalStorage::Execute(TEvPutObjectRequest::TPtr& ev) const { void TFakeExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const { TGuard<TMutex> g(Mutex); Aws::S3::Model::DeleteObjectResult awsResult; - auto& awsKey = ev->Get()->Request.GetKey(); - const TString key(awsKey.data(), awsKey.size()); - Data.erase(key); + auto& bucket = MutableBucket(AwsToString(ev->Get()->GetRequest().GetBucket())); + const TString key = AwsToString(ev->Get()->GetRequest().GetKey()); + bucket.Remove(key); THolder<TEvDeleteObjectResponse> result(new TEvDeleteObjectResponse(key, awsResult)); TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); @@ -146,10 +150,11 @@ void TFakeExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const { void TFakeExternalStorage::Execute(TEvDeleteObjectsRequest::TPtr& ev) const { TGuard<TMutex> g(Mutex); + auto& bucket = MutableBucket(AwsToString(ev->Get()->GetRequest().GetBucket())); Aws::S3::Model::DeleteObjectsResult awsResult; for (auto&& awsKey : ev->Get()->GetRequest().GetDelete().GetObjects()) { - const TString key(awsKey.GetKey().data(), awsKey.GetKey().size()); - Data.erase(key); + const TString key = AwsToString(awsKey.GetKey()); + bucket.Remove(key); Aws::S3::Model::DeletedObject dObject; dObject.WithKey(key); awsResult.AddDeleted(std::move(dObject)); @@ -173,21 +178,9 @@ void TFakeExternalStorage::Execute(TEvAbortMultipartUploadRequest::TPtr& /*ev*/) void TFakeExternalStorage::Execute(TEvCheckObjectExistsRequest::TPtr& ev) const { TGuard<TMutex> g(Mutex); - auto& awsKey = ev->Get()->GetRequest().GetKey(); - const TString key(awsKey.data(), awsKey.size()); - - auto it = Data.find(key); - if (it != Data.end()) { - Aws::S3::Model::HeadObjectResult awsResult; - awsResult.SetETag(MD5::Calc(it->second)); - awsResult.SetContentLength(it->second.size()); - THolder<TEvCheckObjectExistsResponse> result(new TEvCheckObjectExistsResponse(ev->Get()->GetRequest(), awsResult, ev->Get()->GetRequestContext())); - TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); - } else { - Aws::Utils::Outcome<Aws::S3::Model::HeadObjectResult, Aws::S3::S3Error> awsOutcome; - THolder<TEvCheckObjectExistsResponse> result(new TEvCheckObjectExistsResponse(ev->Get()->GetRequest(), awsOutcome, ev->Get()->GetRequestContext())); - TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); - } + auto awsResult = BuildListObjectsResult(ev->Get()->GetRequest()); + THolder<TEvCheckObjectExistsResponse> result(new TEvCheckObjectExistsResponse(awsResult, ev->Get()->GetRequestContext())); + TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release()); } } diff --git a/ydb/core/wrappers/fake_storage.h b/ydb/core/wrappers/fake_storage.h index d14a0606b8e..09c23ee7a2a 100644 --- a/ydb/core/wrappers/fake_storage.h +++ b/ydb/core/wrappers/fake_storage.h @@ -13,15 +13,79 @@ #include <util/string/printf.h> namespace NKikimr::NWrappers::NExternalStorage { +class TFakeBucketStorage { +private: + mutable TMutex Mutex; + TMap<TString, TString> Data; +public: + TMap<TString, TString>::const_iterator begin() const { + return Data.begin(); + } + TMap<TString, TString>::const_iterator end() const { + return Data.end(); + } + ui32 GetSize() const { + return Data.size(); + } + void PutObject(const TString& objectId, const TString& data) { + TGuard<TMutex> g(Mutex); + Data[objectId] = data; + } + std::optional<TString> GetObject(const TString& objectId) const { + TGuard<TMutex> g(Mutex); + auto it = Data.find(objectId); + if (it == Data.end()) { + return {}; + } + return it->second; + } + void Remove(const TString& objectId) { + Data.erase(objectId); + } +}; + class TFakeExternalStorage { private: mutable TMutex Mutex; - mutable TMap<TString, TString> Data; + mutable TMap<TString, TFakeBucketStorage> BucketStorages; + TEvListObjectsResponse::TResult BuildListObjectsResult(const TEvListObjectsRequest::TRequest& request) const; + + TString AwsToString(const Aws::String& awsString) const { + TString result(awsString.data(), awsString.size()); + return result; + } public: TFakeExternalStorage() = default; - TMap<TString, TString> GetData() const { + const TFakeBucketStorage& GetBucket(const TString& bucketId) const { + TGuard<TMutex> g(Mutex); + auto it = BucketStorages.find(bucketId); + if (it == BucketStorages.end()) { + it = BucketStorages.emplace(bucketId, TFakeBucketStorage()).first; + } + return it->second; + } + + TFakeBucketStorage& MutableBucket(const TString& bucketId) const { + TGuard<TMutex> g(Mutex); + auto it = BucketStorages.find(bucketId); + if (it == BucketStorages.end()) { + it = BucketStorages.emplace(bucketId, TFakeBucketStorage()).first; + } + return it->second; + } + + ui32 GetSize() const { + ui32 result = 0; TGuard<TMutex> g(Mutex); - return Data; + for (auto&& i : BucketStorages) { + result += i.second.GetSize(); + } + return result; + } + + ui32 GetBucketsCount() const { + TGuard<TMutex> g(Mutex); + return BucketStorages.size(); } void Execute(TEvListObjectsRequest::TPtr& ev) const; void Execute(TEvGetObjectRequest::TPtr& ev) const; @@ -37,39 +101,56 @@ public: }; class TFakeExternalStorageOperator: public IExternalStorageOperator { +private: + const TString Bucket; public: - TFakeExternalStorageOperator() = default; + TFakeExternalStorageOperator(const TString& bucket) + : Bucket(bucket) + { + + } virtual void Execute(TEvCheckObjectExistsRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvListObjectsRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvGetObjectRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvHeadObjectRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvPutObjectRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvDeleteObjectRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvDeleteObjectsRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvUploadPartRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } virtual void Execute(TEvAbortMultipartUploadRequest::TPtr& ev) const override { + ev->Get()->MutableRequest().WithBucket(Bucket); Singleton<TFakeExternalStorage>()->Execute(ev); } }; diff --git a/ydb/core/wrappers/fake_storage_config.cpp b/ydb/core/wrappers/fake_storage_config.cpp index 29da4e8c244..0d2f9650f2a 100644 --- a/ydb/core/wrappers/fake_storage_config.cpp +++ b/ydb/core/wrappers/fake_storage_config.cpp @@ -10,7 +10,7 @@ TString TFakeExternalStorageConfig::DoGetStorageId() const { } IExternalStorageOperator::TPtr TFakeExternalStorageConfig::DoConstructStorageOperator() const { - return std::make_shared<TFakeExternalStorageOperator>(); + return std::make_shared<TFakeExternalStorageOperator>(Bucket); } } diff --git a/ydb/core/wrappers/fake_storage_config.h b/ydb/core/wrappers/fake_storage_config.h index 01a62b81a34..e6ce528a7c5 100644 --- a/ydb/core/wrappers/fake_storage_config.h +++ b/ydb/core/wrappers/fake_storage_config.h @@ -12,10 +12,17 @@ namespace NKikimr::NWrappers::NExternalStorage { class TFakeExternalStorageConfig: public IExternalStorageConfig { +private: + const TString Bucket; protected: virtual TString DoGetStorageId() const override; virtual IExternalStorageOperator::TPtr DoConstructStorageOperator() const override; public: + TFakeExternalStorageConfig(const TString& bucket) + : Bucket(bucket) + { + + } }; } // NKikimr::NWrappers::NExternalStorage diff --git a/ydb/core/wrappers/s3_storage.cpp b/ydb/core/wrappers/s3_storage.cpp index 86cb5a86f21..55912060365 100644 --- a/ydb/core/wrappers/s3_storage.cpp +++ b/ydb/core/wrappers/s3_storage.cpp @@ -120,9 +120,9 @@ private: public: using TBase::Send; using TBase::TBase; - void Reply(const typename TBase::TRequest& request, const typename TBase::TOutcome& outcome) const { + void Reply(const typename TBase::TRequest& /*request*/, const typename TBase::TOutcome& outcome) const { Y_VERIFY(!std::exchange(TBase::Replied, true), "Double-reply"); - Send(MakeHolder<TEvCheckObjectExistsResponse>(request, outcome, RequestContext).Release()); + Send(MakeHolder<TEvCheckObjectExistsResponse>(outcome, RequestContext).Release()); } }; @@ -224,7 +224,7 @@ public: using TContextBase<TEvRequest, TEvResponse>::TContextBase; const TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override { - auto& request = ev->Get()->Request; + auto& request = ev->Get()->MutableRequest(); Buffer = std::move(ev->Get()->Body); request.SetBody(MakeShared<DefaultUnderlyingStream>("StreamContext", @@ -253,7 +253,7 @@ void TS3ExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const { void TS3ExternalStorage::Execute(TEvCheckObjectExistsRequest::TPtr& ev) const { Call<TEvCheckObjectExistsRequest, TEvCheckObjectExistsResponse, TContextBase>( - ev, &S3Client::HeadObjectAsync); + ev, &S3Client::ListObjectsAsync); } void TS3ExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const { diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h index 11009c2171a..c5f239b1a42 100644 --- a/ydb/core/wrappers/s3_storage.h +++ b/ydb/core/wrappers/s3_storage.h @@ -32,6 +32,7 @@ private: THolder<Aws::S3::S3Client> Client; const Aws::Client::ClientConfiguration Config; const Aws::Auth::AWSCredentials Credentials; + const TString Bucket; template <typename TRequest, typename TOutcome> using THandler = std::function<void(const Aws::S3::S3Client*, const TRequest&, const TOutcome&, const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)>; @@ -42,6 +43,7 @@ private: template <typename TEvRequest, typename TEvResponse, template <typename...> typename TContext> void Call(typename TEvRequest::TPtr& ev, TFunc<typename TEvRequest::TRequest, typename TEvResponse::TOutcome> func) const { using TCtx = TContext<TEvRequest, TEvResponse>; + ev->Get()->MutableRequest().WithBucket(Bucket); auto ctx = std::make_shared<TCtx>(TlsActivationContext->ActorSystem(), ev->Sender, ev->Get()->GetRequestContext()); auto callback = []( @@ -64,10 +66,12 @@ private: } public: - TS3ExternalStorage(const Aws::Client::ClientConfiguration& config, const Aws::Auth::AWSCredentials& credentials) + TS3ExternalStorage(const Aws::Client::ClientConfiguration& config, const Aws::Auth::AWSCredentials& credentials, const TString& bucket) : Client(new Aws::S3::S3Client(credentials, config)) , Config(config) - , Credentials(credentials) { + , Credentials(credentials) + , Bucket(bucket) + { } ~TS3ExternalStorage(); diff --git a/ydb/core/wrappers/s3_storage_config.cpp b/ydb/core/wrappers/s3_storage_config.cpp index f0ce4964274..0032e45a519 100644 --- a/ydb/core/wrappers/s3_storage_config.cpp +++ b/ydb/core/wrappers/s3_storage_config.cpp @@ -183,8 +183,30 @@ TString TS3ExternalStorageConfig::DoGetStorageId() const { } IExternalStorageOperator::TPtr TS3ExternalStorageConfig::DoConstructStorageOperator() const { - return std::make_shared<TS3ExternalStorage>(Config, Credentials); + return std::make_shared<TS3ExternalStorage>(Config, Credentials, Bucket); } + +TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Ydb::Import::ImportFromS3Settings& settings): Config(ConfigFromSettings(settings)) +, Credentials(CredentialsFromSettings(settings)) +{ + Bucket = settings.bucket(); +} + +TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Aws::Auth::AWSCredentials& credentials, + const Aws::Client::ClientConfiguration& config, const TString& bucket) + : Config(config) + , Credentials(credentials) +{ + Bucket = bucket; +} + +TS3ExternalStorageConfig::TS3ExternalStorageConfig(const NKikimrSchemeOp::TS3Settings& settings) + : Config(ConfigFromSettings(settings)) + , Credentials(CredentialsFromSettings(settings)) +{ + Bucket = settings.GetBucket(); +} + } #endif // KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/wrappers/s3_storage_config.h b/ydb/core/wrappers/s3_storage_config.h index 7ecefab799e..07fb87b9fb3 100644 --- a/ydb/core/wrappers/s3_storage_config.h +++ b/ydb/core/wrappers/s3_storage_config.h @@ -6,6 +6,7 @@ #include <ydb/core/base/events.h> #include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/library/accessor/accessor.h> #include <ydb/public/api/protos/ydb_import.pb.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/auth/AWSCredentials.h> @@ -24,6 +25,7 @@ struct TS3User { class TS3ExternalStorageConfig: public IExternalStorageConfig, TS3User { private: + YDB_READONLY_DEF(TString, Bucket); Aws::Client::ClientConfiguration Config; const Aws::Auth::AWSCredentials Credentials; @@ -44,21 +46,9 @@ public: return Config; } - TS3ExternalStorageConfig(const NKikimrSchemeOp::TS3Settings& settings) - : Config(ConfigFromSettings(settings)) - , Credentials(CredentialsFromSettings(settings)) { - - } - TS3ExternalStorageConfig(const Ydb::Import::ImportFromS3Settings& settings) - : Config(ConfigFromSettings(settings)) - , Credentials(CredentialsFromSettings(settings)) { - - } - TS3ExternalStorageConfig(const Aws::Auth::AWSCredentials& credentials, const Aws::Client::ClientConfiguration& config) - : Config(config) - , Credentials(credentials) { - - } + TS3ExternalStorageConfig(const NKikimrSchemeOp::TS3Settings& settings); + TS3ExternalStorageConfig(const Ydb::Import::ImportFromS3Settings& settings); + TS3ExternalStorageConfig(const Aws::Auth::AWSCredentials& credentials, const Aws::Client::ClientConfiguration& config, const TString& bucket); }; } // NKikimr::NWrappers::NExternalStorage diff --git a/ydb/core/wrappers/s3_wrapper.cpp b/ydb/core/wrappers/s3_wrapper.cpp index 42474eb605f..84d7479dd63 100644 --- a/ydb/core/wrappers/s3_wrapper.cpp +++ b/ydb/core/wrappers/s3_wrapper.cpp @@ -23,47 +23,9 @@ namespace NKikimr::NWrappers { namespace NExternalStorage { class TS3Wrapper: public TActor<TS3Wrapper> { - void Handle(TEvListObjectsRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvGetObjectRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvCheckObjectExistsRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvHeadObjectRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvPutObjectRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvDeleteObjectRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvDeleteObjectsRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvCreateMultipartUploadRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvUploadPartRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - - void Handle(TEvCompleteMultipartUploadRequest::TPtr& ev) { - CSOperator->Execute(ev); - } - void Handle(TEvAbortMultipartUploadRequest::TPtr& ev) { + template <class T> + void Handle(T& ev) { CSOperator->Execute(ev); } @@ -74,7 +36,9 @@ public: explicit TS3Wrapper(IExternalStorageOperator::TPtr csOperator) : TActor(&TThis::StateWork) - , CSOperator(csOperator) { + , CSOperator(csOperator) + { + Y_VERIFY(!!CSOperator, "not initialized operator. incorrect config."); } virtual ~TS3Wrapper() = default; diff --git a/ydb/core/wrappers/s3_wrapper_ut.cpp b/ydb/core/wrappers/s3_wrapper_ut.cpp index 90724831c92..b864e183010 100644 --- a/ydb/core/wrappers/s3_wrapper_ut.cpp +++ b/ydb/core/wrappers/s3_wrapper_ut.cpp @@ -43,7 +43,7 @@ public: Runtime = MakeHolder<TTestBasicRuntime>(); Runtime->Initialize(TAppPrepare().Unwrap()); Runtime->SetLogPriority(NKikimrServices::S3_WRAPPER, NLog::PRI_DEBUG); - NWrappers::IExternalStorageConfig::TPtr config = std::make_shared<NExternalStorage::TS3ExternalStorageConfig>(Aws::Auth::AWSCredentials(), MakeClientConfig(*Port)); + NWrappers::IExternalStorageConfig::TPtr config = std::make_shared<NExternalStorage::TS3ExternalStorageConfig>(Aws::Auth::AWSCredentials(), MakeClientConfig(*Port), "TEST"); Wrapper = Runtime->Register(NWrappers::CreateS3Wrapper(config->ConstructStorageOperator())); } diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.cpp b/ydb/services/metadata/ds_table/accessor_subscribe.cpp index 93a41f8c481..6d01ae8ca2c 100644 --- a/ydb/services/metadata/ds_table/accessor_subscribe.cpp +++ b/ydb/services/metadata/ds_table/accessor_subscribe.cpp @@ -23,7 +23,7 @@ void TDSAccessorNotifier::Handle(TEvSubscribe::TPtr& ev) { ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot"; return; } - Send(ev->Get()->GetSubscriberId(), new TEvRefreshSubscriberData(snapshot)); + Sender<TEvRefreshSubscriberData>(snapshot).SendTo(ev->Get()->GetSubscriberId()); } } diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp index d75d258450b..faa76a15878 100644 --- a/ydb/services/metadata/ds_table/service.cpp +++ b/ydb/services/metadata/ds_table/service.cpp @@ -20,7 +20,11 @@ void TService::Handle(TEvSubscribeExternal::TPtr& ev) { THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser()); it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetSnapshotId(), Register(actor.Release())).first; } - Send<TEvSubscribe>(it->second, ev->Sender); + if (!!ev->Get()->GetSubscriberId()) { + Send<TEvSubscribe>(it->second, ev->Get()->GetSubscriberId()); + } else { + Send<TEvSubscribe>(it->second, ev->Sender); + } } void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) { diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h index 72800a06d21..888ec5763b4 100644 --- a/ydb/services/metadata/service.h +++ b/ydb/services/metadata/service.h @@ -7,9 +7,12 @@ namespace NKikimr::NMetadataProvider { class TEvSubscribeExternal: public NActors::TEventLocal<TEvSubscribeExternal, EEvSubscribe::EvSubscribeExternal> { private: YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser); + YDB_READONLY_DEF(NActors::TActorId, SubscriberId); public: - TEvSubscribeExternal(ISnapshotParser::TPtr parser) - : SnapshotParser(parser) { + TEvSubscribeExternal(ISnapshotParser::TPtr parser, const NActors::TActorId& subscriberId = {}) + : SnapshotParser(parser) + , SubscriberId(subscriberId) + { Y_VERIFY(!!SnapshotParser); } }; |