aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-11-07 13:26:35 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-11-07 13:26:35 +0300
commit33e0f47c0e8e26c4778d83646252b59c717a4c7b (patch)
tree0407e2db48571e7f60d427e9fa1fe357fe932a68
parentf570e9e1e98392e10ddfe79909f0ad949f9dec8f (diff)
downloadydb-33e0f47c0e8e26c4778d83646252b59c717a4c7b.tar.gz
bucket is property for external storage config->external storage
improve s3 tests usage use special subscriber id for same mailbox problem resolving in tiering
-rw-r--r--ydb/core/base/events.h2
-rw-r--r--ydb/core/protos/services.proto6
-rw-r--r--ydb/core/tx/columnshard/blob.cpp14
-rw-r--r--ydb/core/tx/columnshard/blob.h14
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp5
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp8
-rw-r--r--ydb/core/tx/datashard/export_s3_base_uploader.h6
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp3
-rw-r--r--ydb/core/tx/tiering/manager.cpp24
-rw-r--r--ydb/core/tx/tiering/manager.h8
-rw-r--r--ydb/core/tx/tiering/s3_actor.cpp36
-rw-r--r--ydb/core/wrappers/abstract.cpp2
-rw-r--r--ydb/core/wrappers/events/common.h117
-rw-r--r--ydb/core/wrappers/events/delete_objects.h47
-rw-r--r--ydb/core/wrappers/events/list_objects.h44
-rw-r--r--ydb/core/wrappers/events/object_exists.h47
-rw-r--r--ydb/core/wrappers/fake_storage.cpp91
-rw-r--r--ydb/core/wrappers/fake_storage.h89
-rw-r--r--ydb/core/wrappers/fake_storage_config.cpp2
-rw-r--r--ydb/core/wrappers/fake_storage_config.h7
-rw-r--r--ydb/core/wrappers/s3_storage.cpp8
-rw-r--r--ydb/core/wrappers/s3_storage.h8
-rw-r--r--ydb/core/wrappers/s3_storage_config.cpp24
-rw-r--r--ydb/core/wrappers/s3_storage_config.h20
-rw-r--r--ydb/core/wrappers/s3_wrapper.cpp46
-rw-r--r--ydb/core/wrappers/s3_wrapper_ut.cpp2
-rw-r--r--ydb/services/metadata/ds_table/accessor_subscribe.cpp2
-rw-r--r--ydb/services/metadata/ds_table/service.cpp6
-rw-r--r--ydb/services/metadata/service.h7
30 files changed, 372 insertions, 325 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index e0d00c47671..1b3782b5d47 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -150,6 +150,8 @@ struct TKikimrEvents : TEvents {
ES_DATASHARD_LOAD,
ES_METADATA_PROVIDER,
ES_INTERNAL_REQUEST,
+ ES_BACKGROUND_TASKS,
+ ES_TIERING
};
};
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 4dd801466b4..c01581a8bb6 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -331,6 +331,12 @@ enum EServiceKikimr {
// MetadataProvider
METADATA_PROVIDER = 1500;
+ // Tiering
+ TX_TIERING = 1600;
+
+ // Background tasks
+ BG_TASKS = 1700;
+
};
message TActivity {
diff --git a/ydb/core/tx/columnshard/blob.cpp b/ydb/core/tx/columnshard/blob.cpp
index c7454238bfe..b0771a75f0f 100644
--- a/ydb/core/tx/columnshard/blob.cpp
+++ b/ydb/core/tx/columnshard/blob.cpp
@@ -132,23 +132,15 @@ TUnifiedBlobId ParseSmallBlobId(const TString& s, TString& error) {
return TUnifiedBlobId(tabletId, gen, step, cookie, size);
}
-// Format: "S3_key|bucket"
+// Format: "s = S3_key"
TUnifiedBlobId ParseS3BlobId(const TString& s, TString& error) {
- TVector<TString> keyBucket;
- Split(s, "|", keyBucket);
-
- if (keyBucket.size() != 2) {
- error = TStringBuilder() << "Wrong S3 id '" << s << "'";
- return TUnifiedBlobId();
- }
-
ui64 pathId;
- TUnifiedBlobId dsBlobId = S3KeyToDsId(keyBucket[0], error, pathId);
+ TUnifiedBlobId dsBlobId = S3KeyToDsId(s, error, pathId);
if (!dsBlobId.IsValid()) {
return TUnifiedBlobId();
}
- return TUnifiedBlobId(dsBlobId, TUnifiedBlobId::S3_BLOB, keyBucket[1], pathId);
+ return TUnifiedBlobId(dsBlobId, TUnifiedBlobId::S3_BLOB, pathId);
}
}
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h
index 00228e7071c..c0f0e723531 100644
--- a/ydb/core/tx/columnshard/blob.h
+++ b/ydb/core/tx/columnshard/blob.h
@@ -87,13 +87,11 @@ class TUnifiedBlobId {
struct TS3BlobId {
TDsBlobId DsBlobId;
- TString Bucket;
TString Key;
TS3BlobId() = default;
- TS3BlobId(const TUnifiedBlobId& dsBlob, const TString& bucket, const ui64 pathId)
- : Bucket(bucket)
+ TS3BlobId(const TUnifiedBlobId& dsBlob, const ui64 pathId)
{
Y_VERIFY(dsBlob.IsDsBlob());
DsBlobId = std::get<TDsBlobId>(dsBlob.Id);
@@ -101,15 +99,15 @@ class TUnifiedBlobId {
}
bool operator == (const TS3BlobId& other) const {
- return Bucket == other.Bucket && Key == other.Key;
+ return Key == other.Key;
}
TString ToStringNew() const {
- return Sprintf("%s|%s", Key.c_str(), Bucket.c_str());
+ return Sprintf("%s", Key.c_str());
}
ui64 Hash() const {
- return CombineHashes<ui64>(THash<TString>()(Bucket), THash<TString>()(Key));
+ return IntHash(THash<TString>()(Key));
}
};
@@ -143,8 +141,8 @@ public:
{}
// Make S3 blob Id from DS one
- TUnifiedBlobId(const TUnifiedBlobId& blob, EBlobType type, const TString& bucket, const ui64 pathId)
- : Id(TS3BlobId(blob, bucket, pathId))
+ TUnifiedBlobId(const TUnifiedBlobId& blob, EBlobType type, const ui64 pathId)
+ : Id(TS3BlobId(blob, pathId))
{
Y_VERIFY(type == S3_BLOB);
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 6b672049d45..0cbfb6079ee 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/tx/tiering/external_data.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/services/metadata/service.h>
+#include <ydb/core/tx/tiering/manager.h>
namespace NKikimr::NColumnShard {
@@ -951,7 +952,7 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl
EnableTiering = schema.GetEnableTiering();
if (OwnerPath && !Tiers && EnableTiering) {
- Tiers = std::make_shared<TTiersManager>(TabletID(), OwnerPath);
+ Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), OwnerPath);
}
if (!!Tiers) {
if (EnableTiering) {
@@ -1044,7 +1045,7 @@ TActorId TColumnShard::GetS3ActorForTier(const TString& tierName) const {
void TColumnShard::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
if (!Tiers) {
- Tiers = std::make_shared<TTiersManager>(TabletID(), OwnerPath);
+ Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), OwnerPath);
}
Tiers->TakeConfigs(ev->Get()->GetSnapshot());
if (EnableTiering) {
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index 9cf3766c597..7dd77b12ed2 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -595,7 +595,7 @@ void TestTwoHotTiers(bool reboot) {
}
void TestHotAndColdTiers(bool reboot) {
- TString bucket = "ydb";
+ const TString bucket = "tiering-test-01";
TPortManager portManager;
const ui16 port = portManager.GetPort();
@@ -612,9 +612,10 @@ void TestHotAndColdTiers(bool reboot) {
s3Config.SetScheme(NKikimrSchemeOp::TS3Settings::HTTP);
s3Config.SetVerifySSL(false);
-#if 0
+ s3Config.SetBucket(bucket);
+//#define S3_TEST_USAGE
+#ifdef S3_TEST_USAGE
s3Config.SetEndpoint("storage.cloud-preprod.yandex.net");
- s3Config.SetBucket("tiering-test-01");
s3Config.SetAccessKey("...");
s3Config.SetSecretKey("...");
s3Config.SetProxyHost("localhost");
@@ -622,7 +623,6 @@ void TestHotAndColdTiers(bool reboot) {
s3Config.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP);
#else
s3Config.SetEndpoint("fake");
- s3Config.SetBucket(bucket);
#endif
s3Config.SetRequestTimeoutMs(10000);
s3Config.SetHttpRequestTimeoutMs(10000);
diff --git a/ydb/core/tx/datashard/export_s3_base_uploader.h b/ydb/core/tx/datashard/export_s3_base_uploader.h
index 1acce848bc4..8ebc1a79ffc 100644
--- a/ydb/core/tx/datashard/export_s3_base_uploader.h
+++ b/ydb/core/tx/datashard/export_s3_base_uploader.h
@@ -79,7 +79,6 @@ protected:
google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer);
auto request = Aws::S3::Model::PutObjectRequest()
- .WithBucket(Settings.GetBucket())
.WithKey(Settings.GetSchemeKey())
.WithStorageClass(Settings.GetStorageClass());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
@@ -145,7 +144,6 @@ protected:
void UploadData() {
if (!MultiPart) {
auto request = Aws::S3::Model::PutObjectRequest()
- .WithBucket(Settings.GetBucket())
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithStorageClass(Settings.GetStorageClass());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
@@ -156,7 +154,6 @@ protected:
}
auto request = Aws::S3::Model::UploadPartRequest()
- .WithBucket(Settings.GetBucket())
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithUploadId(*UploadId)
.WithPartNumber(Parts.size() + 1);
@@ -187,7 +184,6 @@ protected:
if (!upload) {
auto request = Aws::S3::Model::CreateMultipartUploadRequest()
- .WithBucket(Settings.GetBucket())
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithStorageClass(Settings.GetStorageClass());
this->Send(Client, new TEvExternalStorage::TEvCreateMultipartUploadRequest(request));
@@ -209,7 +205,6 @@ protected:
}
auto request = Aws::S3::Model::CompleteMultipartUploadRequest()
- .WithBucket(Settings.GetBucket())
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithUploadId(*UploadId)
.WithMultipartUpload(Aws::S3::Model::CompletedMultipartUpload().WithParts(std::move(parts)));
@@ -224,7 +219,6 @@ protected:
}
auto request = Aws::S3::Model::AbortMultipartUploadRequest()
- .WithBucket(Settings.GetBucket())
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithUploadId(*UploadId);
this->Send(Client, new TEvExternalStorage::TEvAbortMultipartUploadRequest(request));
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index ba4f23c09d0..cb91713f93a 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -357,7 +357,6 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ": key# " << key);
auto request = Model::HeadObjectRequest()
- .WithBucket(Settings.GetBucket())
.WithKey(key);
Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request));
@@ -369,7 +368,6 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ", range# " << range.first << "-" << range.second);
auto request = Model::GetObjectRequest()
- .WithBucket(Settings.GetBucket())
.WithKey(key)
.WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second);
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
index a95794cdcf9..5819f9f5567 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
@@ -29,7 +29,6 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
void HeadObject(const TString& key) {
auto request = Model::HeadObjectRequest()
- .WithBucket(ImportInfo->Settings.bucket())
.WithKey(key);
Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request));
@@ -52,7 +51,6 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
auto request = Model::GetObjectRequest()
- .WithBucket(ImportInfo->Settings.bucket())
.WithKey(key)
.WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second);
@@ -137,7 +135,6 @@ public:
if (Client) {
Send(Client, new TEvents::TEvPoisonPill());
}
-
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
HeadObject(SchemeKey);
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
index 114b634406a..a9cd24a2c75 100644
--- a/ydb/core/tx/tiering/manager.cpp
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -30,7 +30,7 @@ public:
}
void Bootstrap() {
Become(&TThis::StateMain);
- Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation()));
+ Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation(), SelfId()));
}
void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
auto snapshot = ev->Get()->GetSnapshot();
@@ -79,7 +79,7 @@ bool TManager::Start() {
#ifndef KIKIMR_DISABLE_S3_OPS
auto& ctx = TActivationContext::AsActorContext();
const NActors::TActorId newActor = ctx.Register(
- CreateS3Actor(TabletId, ctx.SelfID, Config.GetTierName())
+ CreateS3Actor(TabletId, TabletActorId, Config.GetTierName())
);
ctx.Send(newActor, new TEvPrivate::TEvS3Settings(Config.GetProtoConfig().GetObjectStorage()));
Stop();
@@ -88,9 +88,11 @@ bool TManager::Start() {
return true;
}
-TManager::TManager(const ui64 tabletId, const TTierConfig& config)
+TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TTierConfig& config)
: TabletId(tabletId)
- , Config(config) {
+ , TabletActorId(tabletActorId)
+ , Config(config)
+{
}
NOlap::TStorageTier TManager::BuildTierStorage() const {
@@ -147,7 +149,7 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt)
if (Managers.contains(i.second.GetTierName())) {
continue;
}
- NTiers::TManager localManager(TabletId, i.second);
+ NTiers::TManager localManager(TabletId, TabletActorId, i.second);
auto& manager = Managers.emplace(i.second.GetTierName(), std::move(localManager)).first->second;
if (IsActive()) {
manager.Start();
@@ -158,12 +160,12 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt)
TActorId TTiersManager::GetStorageActorId(const TString& tierName) {
auto it = Managers.find(tierName);
if (it == Managers.end()) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "No S3 actor for tier '" << tierName << "' at tablet " << TabletId;
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "No S3 actor for tier '" << tierName << "' at tablet " << TabletId;
return {};
}
auto actorId = it->second.GetStorageActorId();
if (!actorId) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "Not started storage actor for tier '" << tierName << "' at tablet " << TabletId;
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "Not started storage actor for tier '" << tierName << "' at tablet " << TabletId;
return {};
}
return actorId;
@@ -234,4 +236,12 @@ TTiersManager::~TTiersManager() {
}
}
+TActorId TTiersManager::GetActorId() const {
+ if (Actor) {
+ return Actor->SelfId();
+ } else {
+ return {};
+ }
+}
+
}
diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h
index 4313aefce42..008682a8917 100644
--- a/ydb/core/tx/tiering/manager.h
+++ b/ydb/core/tx/tiering/manager.h
@@ -13,10 +13,11 @@ namespace NTiers {
class TManager {
private:
ui64 TabletId = 0;
+ YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
YDB_READONLY_DEF(TTierConfig, Config);
YDB_READONLY_DEF(NActors::TActorId, StorageActorId);
public:
- TManager(const ui64 tabletId, const TTierConfig& config);
+ TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TTierConfig& config);
static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression);
NOlap::TStorageTier BuildTierStorage() const;
@@ -34,6 +35,7 @@ private:
class TActor;
using TManagers = TMap<TString, NTiers::TManager>;
ui64 TabletId = 0;
+ const TActorId TabletActorId;
TString OwnerPath;
TActor* Actor = nullptr;
YDB_READONLY_DEF(TManagers, Managers);
@@ -43,11 +45,13 @@ private:
mutable NMetadataProvider::ISnapshotParser::TPtr ExternalDataManipulation;
public:
- TTiersManager(const ui64 tabletId, const TString& ownerPath)
+ TTiersManager(const ui64 tabletId, const TActorId& tabletActorId, const TString& ownerPath)
: TabletId(tabletId)
+ , TabletActorId(tabletActorId)
, OwnerPath(ownerPath)
{
}
+ TActorId GetActorId() const;
~TTiersManager();
THashMap<ui64, NOlap::TTiersInfo> GetTiering() const;
void TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshot);
diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp
index d9136104748..5dd253607e3 100644
--- a/ydb/core/tx/tiering/s3_actor.cpp
+++ b/ydb/core/tx/tiering/s3_actor.cpp
@@ -32,8 +32,8 @@ public:
return Event->Blobs;
}
- TUnifiedBlobId AddExported(const TString& bucket, const TUnifiedBlobId& srcBlob, const ui64 pathId) {
- Event->SrcToDstBlobs[srcBlob] = TUnifiedBlobId(srcBlob, TUnifiedBlobId::S3_BLOB, bucket, pathId);
+ TUnifiedBlobId AddExported(const TUnifiedBlobId& srcBlob, const ui64 pathId) {
+ Event->SrcToDstBlobs[srcBlob] = TUnifiedBlobId(srcBlob, TUnifiedBlobId::S3_BLOB, pathId);
return Event->SrcToDstBlobs[srcBlob];
}
@@ -74,9 +74,9 @@ public:
return NKikimrServices::TActivity::TX_COLUMNSHARD_S3_ACTOR;
}
- TS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName)
+ TS3Actor(ui64 tabletId, const TActorId& shardActor, const TString& tierName)
: TabletId(tabletId)
- , ShardActor(parent)
+ , ShardActor(shardActor)
, TierName(tierName)
{}
@@ -88,16 +88,16 @@ public:
void Handle(TEvPrivate::TEvS3Settings::TPtr& ev) {
auto& msg = *ev->Get();
auto& endpoint = msg.Settings.GetEndpoint();
- Bucket = msg.Settings.GetBucket();
+ const auto& bucket = msg.Settings.GetBucket();
LOG_S_DEBUG("[S3] Update settings for tier '" << TierName << "' endpoint '" << endpoint
- << "' bucket '" << Bucket << "' at tablet " << TabletId);
+ << "' bucket '" << bucket << "' at tablet " << TabletId);
if (endpoint.empty()) {
LOG_S_ERROR("[S3] No endpoint in settings for tier '" << TierName << "' at tablet " << TabletId);
return;
}
- if (Bucket.empty()) {
+ if (bucket.empty()) {
LOG_S_ERROR("[S3] No bucket in settings for tier '" << TierName << "' at tablet " << TabletId);
return;
}
@@ -113,13 +113,14 @@ public:
void Handle(TEvPrivate::TEvExport::TPtr& ev) {
auto& msg = *ev->Get();
ui64 exportNo = msg.ExportNo;
+ Y_VERIFY(ev->Get()->DstActor == ShardActor);
Y_VERIFY(!Exports.count(exportNo));
Exports[exportNo] = TS3Export(ev->Release());
auto& ex = Exports[exportNo];
for (auto& [blobId, blob] : ex.Blobs()) {
- TString key = ex.AddExported(Bucket, blobId, blob.PathId).GetS3Key();
+ TString key = ex.AddExported(blobId, blob.PathId).GetS3Key();
Y_VERIFY(!ExportingKeys.count(key)); // TODO
ex.RegisterKey(key);
@@ -221,7 +222,11 @@ public:
if (!context) {
return;
}
- if (!msg.IsExists()) {
+ const auto& resultOutcome = msg.Result;
+
+ if (!resultOutcome.IsSuccess()) {
+ KeyFinished(context->GetKey(), true, LogError("CheckObjectExistsResponse", resultOutcome.GetError(), !!context->GetKey()));
+ } else if (!msg.IsExists()) {
SendPutObject(context->GetKey(), std::move(context->DetachData()));
} else {
KeyFinished(context->GetKey(), false, "");
@@ -348,7 +353,6 @@ public:
auto& ex = it->second;
ex.FinishKey(key);
- Y_VERIFY(ex.Event->DstActor == ShardActor);
if (hasError) {
ex.Event->Status = NKikimrProto::ERROR;
@@ -370,7 +374,6 @@ private:
ui64 TabletId;
TActorId ShardActor;
TString TierName;
- TString Bucket;
ui64 ForgetNo{};
THashMap<ui64, TS3Export> Exports;
THashMap<ui64, TS3Forget> Forgets;
@@ -412,7 +415,6 @@ private:
void SendPutObject(const TString& key, TString&& data) const {
auto request = Aws::S3::Model::PutObjectRequest()
- .WithBucket(Bucket)
.WithKey(key)
.WithStorageClass(Aws::S3::Model::StorageClass::STANDARD_IA);
#if 0
@@ -425,18 +427,16 @@ private:
}
void SendPutObjectIfNotExists(const TString& key, TString&& data) {
- auto request = Aws::S3::Model::HeadObjectRequest()
- .WithBucket(Bucket)
- .WithKey(key);
+ auto request = Aws::S3::Model::ListObjectsRequest()
+ .WithPrefix(key);
- LOG_S_DEBUG("[S3] HeadObjectRequest key '" << key << "' at tablet " << TabletId);
+ LOG_S_DEBUG("[S3] PutObjectIfNotExists->ListObjectsRequest key '" << key << "' at tablet " << TabletId);
std::shared_ptr<TEvCheckObjectExistsRequestContext> context = std::make_shared<TEvCheckObjectExistsRequestContext>(key, std::move(data));
Send(ExternalStorageActorId, new TEvExternalStorage::TEvCheckObjectExistsRequest(request, context));
}
void SendHeadObject(const TString& key) const {
auto request = Aws::S3::Model::HeadObjectRequest()
- .WithBucket(Bucket)
.WithKey(key);
LOG_S_DEBUG("[S3] HeadObjectRequest key '" << key << "' at tablet " << TabletId);
@@ -446,7 +446,6 @@ private:
void SendGetObject(const TString& key, const ui32 startPos, const ui32 size) {
Y_VERIFY(size);
auto request = Aws::S3::Model::GetObjectRequest()
- .WithBucket(Bucket)
.WithKey(key)
.WithRange(TStringBuilder() << "bytes=" << startPos << "-" << startPos + size - 1);
@@ -456,7 +455,6 @@ private:
void SendDeleteObject(const TString& key) const {
auto request = Aws::S3::Model::DeleteObjectRequest()
- .WithBucket(Bucket)
.WithKey(key);
Send(ExternalStorageActorId, new TEvExternalStorage::TEvDeleteObjectRequest(request));
diff --git a/ydb/core/wrappers/abstract.cpp b/ydb/core/wrappers/abstract.cpp
index be0fde71922..13c4f1bdddc 100644
--- a/ydb/core/wrappers/abstract.cpp
+++ b/ydb/core/wrappers/abstract.cpp
@@ -12,7 +12,7 @@ IExternalStorageOperator::TPtr IExternalStorageConfig::ConstructStorageOperator(
IExternalStorageConfig::TPtr IExternalStorageConfig::Construct(const NKikimrSchemeOp::TS3Settings& settings) {
if (settings.GetEndpoint() == "fake") {
- return std::make_shared<TFakeExternalStorageConfig>();
+ return std::make_shared<TFakeExternalStorageConfig>(settings.GetBucket());
} else {
return std::make_shared <TS3ExternalStorageConfig>(settings);
}
diff --git a/ydb/core/wrappers/events/common.h b/ydb/core/wrappers/events/common.h
index e051437533c..632e63b2386 100644
--- a/ydb/core/wrappers/events/common.h
+++ b/ydb/core/wrappers/events/common.h
@@ -13,20 +13,33 @@ namespace NKikimr::NWrappers::NExternalStorage {
template <typename TDerived, ui32 EventType, typename T>
struct TGenericRequest: public NActors::TEventLocal<TDerived, EventType> {
+private:
+ IRequestContext::TPtr RequestContext;
+public:
using TRequest = T;
TRequest Request;
IRequestContext::TPtr GetRequestContext() const {
- return nullptr;
+ return RequestContext;
}
const TRequest& GetRequest() const {
return Request;
}
+ TRequest& MutableRequest() {
+ return Request;
+ }
+
explicit TGenericRequest(const TRequest& request)
: Request(request) {
}
+ TGenericRequest(const TRequest& request, IRequestContext::TPtr requestContext)
+ : RequestContext(requestContext)
+ , Request(request)
+ {
+ }
+
TString ToString() const override {
return TStringBuilder() << this->ToStringHeader() << " {"
<< " Request: " << Request
@@ -57,47 +70,100 @@ struct TRequestWithBody: public TGenericRequest<TDerived, EventType, T> {
using TBase = TRequestWithBody<TDerived, EventType, T>;
};
-template <typename TDerived, ui32 EventType, typename T, typename U = T>
-struct TGenericResponse: public NActors::TEventLocal<TDerived, EventType> {
- using TOutcome = Aws::Utils::Outcome<T, Aws::S3::S3Error>;
+template <typename TDerived, ui32 EventType, typename TAWSResultExt, typename U = TAWSResultExt>
+struct TBaseGenericResponse: public NActors::TEventLocal<TDerived, EventType> {
+private:
+ using TBase = NActors::TEventLocal<TDerived, EventType>;
+ IRequestContext::TPtr RequestContext;
+public:
+ using TOutcome = Aws::Utils::Outcome<TAWSResultExt, Aws::S3::S3Error>;
using TResult = Aws::Utils::Outcome<U, Aws::S3::S3Error>;
+ using TAWSResult = U;
+ using TAWSOutcome = TResult;
using TKey = std::optional<TString>;
- TKey Key;
TResult Result;
- explicit TGenericResponse(const TKey& key, const TOutcome& outcome)
- : Key(key)
+ explicit TBaseGenericResponse(const TOutcome& outcome)
+ : Result(TDerived::ResultFromOutcome(outcome)) {
+ }
+
+ TBaseGenericResponse(const TOutcome& outcome, IRequestContext::TPtr requestContext)
+ : RequestContext(requestContext)
, Result(TDerived::ResultFromOutcome(outcome)) {
}
+ bool IsSuccess() const {
+ return Result.IsSuccess();
+ }
+ const Aws::S3::S3Error& GetError() const {
+ return Result.GetError();
+ }
+ const U& GetResult() const {
+ return Result.GetResult();
+ }
+
+ template <class T>
+ std::shared_ptr<T> GetRequestContextAs() const {
+ return dynamic_pointer_cast<T>(RequestContext);
+ }
+
static TResult ResultFromOutcome(const TOutcome& outcome) {
return outcome;
}
TString ToString() const override {
return TStringBuilder() << this->ToStringHeader() << " {"
- << " Key: " << (Key ? "null" : *Key)
<< " Result: " << Result
<< " }";
}
+};
- using TBase = TGenericResponse<TDerived, EventType, T, U>;
+template <typename TDerived, ui32 EventType, typename TAWSResult, typename U = TAWSResult>
+struct TGenericResponse: public TBaseGenericResponse<TDerived, EventType, TAWSResult, U> {
+private:
+ using TBase = TBaseGenericResponse<TDerived, EventType, TAWSResult, U>;
+public:
+ using TOutcome = typename TBase::TOutcome;
+ using TResult = typename TBase::TResult;
+ using TKey = std::optional<TString>;
+
+ TKey Key;
+
+ TGenericResponse(const TKey& key, const TOutcome& outcome)
+ : TBase(outcome)
+ , Key(key) {
+ }
+
+ TGenericResponse(const TKey& key, const TOutcome& outcome, IRequestContext::TPtr requestContext)
+ : TBase(outcome, requestContext)
+ , Key(key)
+ {
+ }
+
+ TString ToString() const override {
+ return TStringBuilder() << this->ToStringHeader() << " {"
+ << " Key: " << (Key ? "null" : *Key)
+ << " Result: " << TBase::Result
+ << " }";
+ }
};
template <typename TDerived, ui32 EventType, typename T, typename U>
struct TResponseWithBody: public TGenericResponse<TDerived, EventType, T, U> {
- using TGeneric = TGenericResponse<TDerived, EventType, T, U>;
- using TKey = typename TGeneric::TKey;
+private:
+ using TBase = TGenericResponse<TDerived, EventType, T, U>;
+public:
+ using TKey = typename TBase::TKey;
TString Body;
- explicit TResponseWithBody(const TKey& key, const typename TGeneric::TOutcome& outcome)
- : TGeneric(key, outcome) {
+ explicit TResponseWithBody(const TKey& key, const typename TBase::TOutcome& outcome)
+ : TBase(key, outcome) {
}
- explicit TResponseWithBody(const TKey& key, const typename TGeneric::TOutcome& outcome, TString&& body)
- : TGeneric(key, outcome)
+ explicit TResponseWithBody(const TKey& key, const typename TBase::TOutcome& outcome, TString&& body)
+ : TBase(key, outcome)
, Body(std::move(body)) {
}
@@ -108,8 +174,6 @@ struct TResponseWithBody: public TGenericResponse<TDerived, EventType, T, U> {
<< " Body: " << Body.size() << "b"
<< " }";
}
-
- using TBase = TResponseWithBody<TDerived, EventType, T, U>;
};
#define DEFINE_REQUEST(name, base) \
@@ -121,14 +185,21 @@ struct TResponseWithBody: public TGenericResponse<TDerived, EventType, T, U> {
DEFINE_REQUEST(name, TGenericRequest)
#define DECLARE_GENERIC_RESPONSE(name) \
- struct TEv##name##Response: public TGenericResponse<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result>
+ struct TEv##name##Response: public TGenericResponse<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result> {\
+ private:\
+ using TBase = TGenericResponse<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result>;\
+ public:\
+ using TBase::TBase;
#define DECLARE_RESPONSE_WITH_BODY(name, result_t) \
- struct TEv##name##Response: public TResponseWithBody<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result, result_t>
+ struct TEv##name##Response: public TResponseWithBody<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result, result_t> {\
+ private:\
+ using TBase = TResponseWithBody<TEv##name##Response, Ev##name##Response, Aws::S3::Model::name##Result, result_t>;\
+ public:\
+ using TBase::TBase;
#define DEFINE_GENERIC_RESPONSE(name) \
- DECLARE_GENERIC_RESPONSE(name) { \
- using TBase::TBase; \
+ DECLARE_GENERIC_RESPONSE(name) \
}
#define DEFINE_GENERIC_REQUEST_RESPONSE(name) \
@@ -136,7 +207,7 @@ struct TResponseWithBody: public TGenericResponse<TDerived, EventType, T, U> {
DEFINE_GENERIC_RESPONSE(name)
DEFINE_GENERIC_REQUEST(GetObject);
-DECLARE_RESPONSE_WITH_BODY(GetObject, Aws::String) {
+DECLARE_RESPONSE_WITH_BODY(GetObject, Aws::String)
static TResult ResultFromOutcome(const TOutcome & outcome) {
if (outcome.IsSuccess()) {
return outcome.GetResult().GetETag();
@@ -144,8 +215,6 @@ DECLARE_RESPONSE_WITH_BODY(GetObject, Aws::String) {
return outcome.GetError();
}
}
-
- using TBase::TBase;
};
DEFINE_REQUEST(PutObject, TRequestWithBody);
diff --git a/ydb/core/wrappers/events/delete_objects.h b/ydb/core/wrappers/events/delete_objects.h
index f4da8e49bbd..f8c6945fb69 100644
--- a/ydb/core/wrappers/events/delete_objects.h
+++ b/ydb/core/wrappers/events/delete_objects.h
@@ -13,51 +13,18 @@
namespace NKikimr::NWrappers::NExternalStorage {
- class TEvDeleteObjectsRequest: public TEventLocal<TEvDeleteObjectsRequest, EvDeleteObjectsRequest> {
- public:
- using TRequest = Aws::S3::Model::DeleteObjectsRequest;
+ class TEvDeleteObjectsRequest: public TGenericRequest<TEvDeleteObjectsRequest, EvDeleteObjectsRequest, Aws::S3::Model::DeleteObjectsRequest> {
private:
- TRequest Request;
+ using TBase = TGenericRequest<TEvDeleteObjectsRequest, EvDeleteObjectsRequest, Aws::S3::Model::DeleteObjectsRequest>;
public:
- TEvDeleteObjectsRequest(const TRequest& request)
- : Request(request) {
-
- }
- IRequestContext::TPtr GetRequestContext() const {
- return nullptr;
- }
- const TRequest& GetRequest() const {
- return Request;
- }
- TRequest* operator->() {
- return &Request;
- }
+ using TBase::TBase;
};
- class TEvDeleteObjectsResponse: public TEventLocal<TEvDeleteObjectsResponse, EvDeleteObjectsResponse> {
- public:
- using TResult = Aws::S3::Model::DeleteObjectsResult;
- using TOutcome = Aws::Utils::Outcome<TResult, Aws::S3::S3Error>;
- using TKey = std::optional<TString>;
+
+ class TEvDeleteObjectsResponse: public TBaseGenericResponse<TEvDeleteObjectsResponse, EvDeleteObjectsResponse, Aws::S3::Model::DeleteObjectsResult> {
private:
- TOutcome Outcome;
+ using TBase = TBaseGenericResponse<TEvDeleteObjectsResponse, EvDeleteObjectsResponse, Aws::S3::Model::DeleteObjectsResult>;
public:
- TEvDeleteObjectsResponse(const TOutcome& result)
- : Outcome(result)
- {
-
- }
- bool IsSuccess() const {
- return Outcome.IsSuccess();
- }
- const Aws::S3::S3Error& GetError() const {
- return Outcome.GetError();
- }
- const TResult& GetResult() const {
- return Outcome.GetResult();
- }
- const TResult* operator->() const {
- return &Outcome.GetResult();
- }
+ using TBase::TBase;
};
}
diff --git a/ydb/core/wrappers/events/list_objects.h b/ydb/core/wrappers/events/list_objects.h
index 714296c5574..b2fa83aedee 100644
--- a/ydb/core/wrappers/events/list_objects.h
+++ b/ydb/core/wrappers/events/list_objects.h
@@ -13,48 +13,18 @@
namespace NKikimr::NWrappers::NExternalStorage {
- class TEvListObjectsRequest: public TEventLocal<TEvListObjectsRequest, EvListObjectsRequest> {
- public:
- using TRequest = Aws::S3::Model::ListObjectsRequest;
+ class TEvListObjectsRequest: public TGenericRequest<TEvListObjectsRequest, EvListObjectsRequest, Aws::S3::Model::ListObjectsRequest> {
private:
- TRequest Request;
+ using TBase = TGenericRequest<TEvListObjectsRequest, EvListObjectsRequest, Aws::S3::Model::ListObjectsRequest>;
public:
- TEvListObjectsRequest(const TRequest& request)
- : Request(request) {
-
- }
- IRequestContext::TPtr GetRequestContext() const {
- return nullptr;
- }
- const TRequest& GetRequest() const {
- return Request;
- }
- TRequest* operator->() {
- return &Request;
- }
+ using TBase::TBase;
};
- class TEvListObjectsResponse: public TEventLocal<TEvListObjectsResponse, EvListObjectsResponse> {
- public:
- using TResult = Aws::S3::Model::ListObjectsResult;
- using TOutcome = Aws::Utils::Outcome<TResult, Aws::S3::S3Error>;
- using TKey = std::optional<TString>;
+
+ class TEvListObjectsResponse: public TBaseGenericResponse<TEvListObjectsResponse, EvListObjectsResponse, Aws::S3::Model::ListObjectsResult> {
private:
- TOutcome Outcome;
+ using TBase = TBaseGenericResponse<TEvListObjectsResponse, EvListObjectsResponse, Aws::S3::Model::ListObjectsResult>;
public:
- TEvListObjectsResponse(const TOutcome& result)
- : Outcome(result)
- {
-
- }
- bool IsSuccess() const {
- return Outcome.IsSuccess();
- }
- const TResult& GetResult() const {
- return Outcome.GetResult();
- }
- const TResult* operator->() const {
- return &Outcome.GetResult();
- }
+ using TBase::TBase;
};
}
diff --git a/ydb/core/wrappers/events/object_exists.h b/ydb/core/wrappers/events/object_exists.h
index 2c6128e507a..d16a06143ef 100644
--- a/ydb/core/wrappers/events/object_exists.h
+++ b/ydb/core/wrappers/events/object_exists.h
@@ -4,6 +4,7 @@
#include <ydb/core/base/events.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/library/accessor/accessor.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectResult.h>
@@ -12,50 +13,22 @@
namespace NKikimr::NWrappers::NExternalStorage {
-class TEvCheckObjectExistsRequest: public TEventLocal<TEvCheckObjectExistsRequest, EvCheckObjectExistsRequest> {
-public:
- using TRequest = Aws::S3::Model::HeadObjectRequest;
+class TEvCheckObjectExistsRequest: public TGenericRequest<TEvCheckObjectExistsRequest,
+ EvCheckObjectExistsRequest, Aws::S3::Model::ListObjectsRequest> {
private:
- TRequest Request;
- IRequestContext::TPtr RequestContext;
+ using TBase = TGenericRequest<TEvCheckObjectExistsRequest, EvCheckObjectExistsRequest, Aws::S3::Model::ListObjectsRequest>;
public:
- TEvCheckObjectExistsRequest(const TRequest& request, IRequestContext::TPtr requestContext)
- : Request(request)
- , RequestContext(requestContext) {
-
- }
- IRequestContext::TPtr GetRequestContext() const {
- return RequestContext;
- }
- const TRequest& GetRequest() const {
- return Request;
- }
- TRequest* operator->() {
- return &Request;
- }
+ using TBase::TBase;
};
-class TEvCheckObjectExistsResponse: public TEventLocal<TEvCheckObjectExistsResponse, EvCheckObjectExistsResponse> {
-public:
- using TRequest = Aws::S3::Model::HeadObjectRequest;
- using TResult = Aws::S3::Model::HeadObjectResult;
- using TOutcome = Aws::Utils::Outcome<TResult, Aws::S3::S3Error>;
- using TKey = std::optional<TString>;
+class TEvCheckObjectExistsResponse: public TBaseGenericResponse<TEvCheckObjectExistsResponse,
+ EvCheckObjectExistsResponse, Aws::S3::Model::ListObjectsResult> {
private:
- IRequestContext::TPtr RequestContext;
- const bool IsExistsFlag;
+ using TBase = TBaseGenericResponse<TEvCheckObjectExistsResponse, EvCheckObjectExistsResponse, Aws::S3::Model::ListObjectsResult>;
public:
- TEvCheckObjectExistsResponse(const TRequest& /*request*/, const TOutcome& outcome, IRequestContext::TPtr requestContext)
- : RequestContext(requestContext)
- , IsExistsFlag(outcome.IsSuccess() && !outcome.GetResult().GetDeleteMarker()) {
-
- }
+ using TBase::TBase;
bool IsExists() const {
- return IsExistsFlag;
- }
- template <class T>
- std::shared_ptr<T> GetRequestContextAs() const {
- return dynamic_pointer_cast<T>(RequestContext);
+ return Result.IsSuccess() && Result.GetResult().GetContents().size();
}
};
}
diff --git a/ydb/core/wrappers/fake_storage.cpp b/ydb/core/wrappers/fake_storage.cpp
index 147306aeb1b..5db1ee5944f 100644
--- a/ydb/core/wrappers/fake_storage.cpp
+++ b/ydb/core/wrappers/fake_storage.cpp
@@ -44,13 +44,14 @@ static bool TryParseRange(const TString& str, std::pair<ui64, ui64>& range) {
}
}
-void TFakeExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const {
- auto& awsPrefix = ev->Get()->GetRequest().GetPrefix();
+TEvListObjectsResponse::TResult TFakeExternalStorage::BuildListObjectsResult(const TEvListObjectsRequest::TRequest& request) const {
+ auto& bucket = GetBucket(AwsToString(request.GetBucket()));
+ auto& awsPrefix = request.GetPrefix();
const TString prefix(awsPrefix.data(), awsPrefix.size());
THolder<TEvListObjectsResponse> result;
TGuard<TMutex> g(Mutex);
- TEvListObjectsResponse::TResult awsResult;
- for (auto&& i : Data) {
+ TEvListObjectsResponse::TAWSResult awsResult;
+ for (auto&& i : bucket) {
if (!!prefix && !i.first.StartsWith(prefix)) {
continue;
}
@@ -60,31 +61,35 @@ void TFakeExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const {
awsResult.AddContents(std::move(objectMeta));
break;
}
- awsResult.SetIsTruncated(Data.size() > 1);
- TEvListObjectsResponse::TOutcome awsOutcome(awsResult);
- result = MakeHolder<TEvListObjectsResponse>(awsOutcome);
+ awsResult.SetIsTruncated(bucket.GetSize() > 1);
+ return awsResult;
+}
+
+void TFakeExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const {
+ auto awsResult = BuildListObjectsResult(ev->Get()->GetRequest());
+ auto result = MakeHolder<TEvListObjectsResponse>(awsResult);
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
}
void TFakeExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const {
TGuard<TMutex> g(Mutex);
- auto& awsKey = ev->Get()->Request.GetKey();
- const TString key(awsKey.data(), awsKey.size());
- auto it = Data.find(key);
+ auto& bucket = GetBucket(AwsToString(ev->Get()->GetRequest().GetBucket()));
+ const TString key = AwsToString(ev->Get()->GetRequest().GetKey());
+ auto object = bucket.GetObject(key);
TString data;
- if (it != Data.end()) {
+ if (!!object) {
Aws::S3::Model::GetObjectResult awsResult;
- awsResult.SetETag(MD5::Calc(it->second));
- data = it->second;
+ awsResult.SetETag(MD5::Calc(*object));
+ data = *object;
- auto awsRange = ev->Get()->Request.GetRange();
+ auto awsRange = ev->Get()->GetRequest().GetRange();
if (awsRange.size()) {
const TString strRange(awsRange.data(), awsRange.size());
std::pair<ui64, ui64> range;
if (!TryParseRange(strRange, range)) {
Aws::Utils::Outcome<Aws::S3::Model::GetObjectResult, Aws::S3::S3Error> awsOutcome;
- THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, awsOutcome, std::move(data)));
+ THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, std::move(awsOutcome), std::move(data)));
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
return;
} else {
@@ -93,41 +98,40 @@ void TFakeExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const {
}
Aws::Utils::Outcome<Aws::S3::Model::GetObjectResult, Aws::S3::S3Error> awsOutcome(std::move(awsResult));
- THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, awsOutcome, std::move(data)));
+ THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, std::move(awsOutcome), std::move(data)));
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
} else {
Aws::Utils::Outcome<Aws::S3::Model::GetObjectResult, Aws::S3::S3Error> awsOutcome;
- THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, awsOutcome, std::move(data)));
+ THolder<TEvGetObjectResponse> result(new TEvGetObjectResponse(key, std::move(awsOutcome), std::move(data)));
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
}
}
void TFakeExternalStorage::Execute(TEvHeadObjectRequest::TPtr& ev) const {
TGuard<TMutex> g(Mutex);
- auto& awsKey = ev->Get()->Request.GetKey();
- const TString key(awsKey.data(), awsKey.size());
-
- auto it = Data.find(key);
- if (it != Data.end()) {
+ auto& bucket = GetBucket(AwsToString(ev->Get()->GetRequest().GetBucket()));
+ const TString key = AwsToString(ev->Get()->GetRequest().GetKey());
+ auto object = bucket.GetObject(key);
+ if (object) {
Aws::S3::Model::HeadObjectResult awsResult;
- awsResult.SetETag(MD5::Calc(it->second));
- awsResult.SetContentLength(it->second.size());
+ awsResult.SetETag(MD5::Calc(*object));
+ awsResult.SetContentLength(object->size());
Aws::Utils::Outcome<Aws::S3::Model::HeadObjectResult, Aws::S3::S3Error> awsOutcome(awsResult);
- THolder<TEvHeadObjectResponse> result(new TEvHeadObjectResponse(key, awsOutcome));
+ THolder<TEvHeadObjectResponse> result(new TEvHeadObjectResponse(key, std::move(awsOutcome)));
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
} else {
Aws::Utils::Outcome<Aws::S3::Model::HeadObjectResult, Aws::S3::S3Error> awsOutcome;
- THolder<TEvHeadObjectResponse> result(new TEvHeadObjectResponse(key, awsOutcome));
+ THolder<TEvHeadObjectResponse> result(new TEvHeadObjectResponse(key, std::move(awsOutcome)));
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
}
}
void TFakeExternalStorage::Execute(TEvPutObjectRequest::TPtr& ev) const {
TGuard<TMutex> g(Mutex);
+ const TString key = AwsToString(ev->Get()->GetRequest().GetKey());
+ auto& bucket = MutableBucket(AwsToString(ev->Get()->GetRequest().GetBucket()));
+ bucket.PutObject(key, ev->Get()->Body);
Aws::S3::Model::PutObjectResult awsResult;
- auto& awsKey = ev->Get()->Request.GetKey();
- const TString key(awsKey.data(), awsKey.size());
- Data[key] = ev->Get()->Body;
THolder<TEvPutObjectResponse> result(new TEvPutObjectResponse(key, awsResult));
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
@@ -136,9 +140,9 @@ void TFakeExternalStorage::Execute(TEvPutObjectRequest::TPtr& ev) const {
void TFakeExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const {
TGuard<TMutex> g(Mutex);
Aws::S3::Model::DeleteObjectResult awsResult;
- auto& awsKey = ev->Get()->Request.GetKey();
- const TString key(awsKey.data(), awsKey.size());
- Data.erase(key);
+ auto& bucket = MutableBucket(AwsToString(ev->Get()->GetRequest().GetBucket()));
+ const TString key = AwsToString(ev->Get()->GetRequest().GetKey());
+ bucket.Remove(key);
THolder<TEvDeleteObjectResponse> result(new TEvDeleteObjectResponse(key, awsResult));
TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
@@ -146,10 +150,11 @@ void TFakeExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const {
void TFakeExternalStorage::Execute(TEvDeleteObjectsRequest::TPtr& ev) const {
TGuard<TMutex> g(Mutex);
+ auto& bucket = MutableBucket(AwsToString(ev->Get()->GetRequest().GetBucket()));
Aws::S3::Model::DeleteObjectsResult awsResult;
for (auto&& awsKey : ev->Get()->GetRequest().GetDelete().GetObjects()) {
- const TString key(awsKey.GetKey().data(), awsKey.GetKey().size());
- Data.erase(key);
+ const TString key = AwsToString(awsKey.GetKey());
+ bucket.Remove(key);
Aws::S3::Model::DeletedObject dObject;
dObject.WithKey(key);
awsResult.AddDeleted(std::move(dObject));
@@ -173,21 +178,9 @@ void TFakeExternalStorage::Execute(TEvAbortMultipartUploadRequest::TPtr& /*ev*/)
void TFakeExternalStorage::Execute(TEvCheckObjectExistsRequest::TPtr& ev) const {
TGuard<TMutex> g(Mutex);
- auto& awsKey = ev->Get()->GetRequest().GetKey();
- const TString key(awsKey.data(), awsKey.size());
-
- auto it = Data.find(key);
- if (it != Data.end()) {
- Aws::S3::Model::HeadObjectResult awsResult;
- awsResult.SetETag(MD5::Calc(it->second));
- awsResult.SetContentLength(it->second.size());
- THolder<TEvCheckObjectExistsResponse> result(new TEvCheckObjectExistsResponse(ev->Get()->GetRequest(), awsResult, ev->Get()->GetRequestContext()));
- TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
- } else {
- Aws::Utils::Outcome<Aws::S3::Model::HeadObjectResult, Aws::S3::S3Error> awsOutcome;
- THolder<TEvCheckObjectExistsResponse> result(new TEvCheckObjectExistsResponse(ev->Get()->GetRequest(), awsOutcome, ev->Get()->GetRequestContext()));
- TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
- }
+ auto awsResult = BuildListObjectsResult(ev->Get()->GetRequest());
+ THolder<TEvCheckObjectExistsResponse> result(new TEvCheckObjectExistsResponse(awsResult, ev->Get()->GetRequestContext()));
+ TlsActivationContext->ActorSystem()->Send(ev->Sender, result.Release());
}
}
diff --git a/ydb/core/wrappers/fake_storage.h b/ydb/core/wrappers/fake_storage.h
index d14a0606b8e..09c23ee7a2a 100644
--- a/ydb/core/wrappers/fake_storage.h
+++ b/ydb/core/wrappers/fake_storage.h
@@ -13,15 +13,79 @@
#include <util/string/printf.h>
namespace NKikimr::NWrappers::NExternalStorage {
+class TFakeBucketStorage {
+private:
+ mutable TMutex Mutex;
+ TMap<TString, TString> Data;
+public:
+ TMap<TString, TString>::const_iterator begin() const {
+ return Data.begin();
+ }
+ TMap<TString, TString>::const_iterator end() const {
+ return Data.end();
+ }
+ ui32 GetSize() const {
+ return Data.size();
+ }
+ void PutObject(const TString& objectId, const TString& data) {
+ TGuard<TMutex> g(Mutex);
+ Data[objectId] = data;
+ }
+ std::optional<TString> GetObject(const TString& objectId) const {
+ TGuard<TMutex> g(Mutex);
+ auto it = Data.find(objectId);
+ if (it == Data.end()) {
+ return {};
+ }
+ return it->second;
+ }
+ void Remove(const TString& objectId) {
+ Data.erase(objectId);
+ }
+};
+
class TFakeExternalStorage {
private:
mutable TMutex Mutex;
- mutable TMap<TString, TString> Data;
+ mutable TMap<TString, TFakeBucketStorage> BucketStorages;
+ TEvListObjectsResponse::TResult BuildListObjectsResult(const TEvListObjectsRequest::TRequest& request) const;
+
+ TString AwsToString(const Aws::String& awsString) const {
+ TString result(awsString.data(), awsString.size());
+ return result;
+ }
public:
TFakeExternalStorage() = default;
- TMap<TString, TString> GetData() const {
+ const TFakeBucketStorage& GetBucket(const TString& bucketId) const {
+ TGuard<TMutex> g(Mutex);
+ auto it = BucketStorages.find(bucketId);
+ if (it == BucketStorages.end()) {
+ it = BucketStorages.emplace(bucketId, TFakeBucketStorage()).first;
+ }
+ return it->second;
+ }
+
+ TFakeBucketStorage& MutableBucket(const TString& bucketId) const {
+ TGuard<TMutex> g(Mutex);
+ auto it = BucketStorages.find(bucketId);
+ if (it == BucketStorages.end()) {
+ it = BucketStorages.emplace(bucketId, TFakeBucketStorage()).first;
+ }
+ return it->second;
+ }
+
+ ui32 GetSize() const {
+ ui32 result = 0;
TGuard<TMutex> g(Mutex);
- return Data;
+ for (auto&& i : BucketStorages) {
+ result += i.second.GetSize();
+ }
+ return result;
+ }
+
+ ui32 GetBucketsCount() const {
+ TGuard<TMutex> g(Mutex);
+ return BucketStorages.size();
}
void Execute(TEvListObjectsRequest::TPtr& ev) const;
void Execute(TEvGetObjectRequest::TPtr& ev) const;
@@ -37,39 +101,56 @@ public:
};
class TFakeExternalStorageOperator: public IExternalStorageOperator {
+private:
+ const TString Bucket;
public:
- TFakeExternalStorageOperator() = default;
+ TFakeExternalStorageOperator(const TString& bucket)
+ : Bucket(bucket)
+ {
+
+ }
virtual void Execute(TEvCheckObjectExistsRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvListObjectsRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvGetObjectRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvHeadObjectRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvPutObjectRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvDeleteObjectRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvDeleteObjectsRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvUploadPartRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
virtual void Execute(TEvAbortMultipartUploadRequest::TPtr& ev) const override {
+ ev->Get()->MutableRequest().WithBucket(Bucket);
Singleton<TFakeExternalStorage>()->Execute(ev);
}
};
diff --git a/ydb/core/wrappers/fake_storage_config.cpp b/ydb/core/wrappers/fake_storage_config.cpp
index 29da4e8c244..0d2f9650f2a 100644
--- a/ydb/core/wrappers/fake_storage_config.cpp
+++ b/ydb/core/wrappers/fake_storage_config.cpp
@@ -10,7 +10,7 @@ TString TFakeExternalStorageConfig::DoGetStorageId() const {
}
IExternalStorageOperator::TPtr TFakeExternalStorageConfig::DoConstructStorageOperator() const {
- return std::make_shared<TFakeExternalStorageOperator>();
+ return std::make_shared<TFakeExternalStorageOperator>(Bucket);
}
}
diff --git a/ydb/core/wrappers/fake_storage_config.h b/ydb/core/wrappers/fake_storage_config.h
index 01a62b81a34..e6ce528a7c5 100644
--- a/ydb/core/wrappers/fake_storage_config.h
+++ b/ydb/core/wrappers/fake_storage_config.h
@@ -12,10 +12,17 @@
namespace NKikimr::NWrappers::NExternalStorage {
class TFakeExternalStorageConfig: public IExternalStorageConfig {
+private:
+ const TString Bucket;
protected:
virtual TString DoGetStorageId() const override;
virtual IExternalStorageOperator::TPtr DoConstructStorageOperator() const override;
public:
+ TFakeExternalStorageConfig(const TString& bucket)
+ : Bucket(bucket)
+ {
+
+ }
};
} // NKikimr::NWrappers::NExternalStorage
diff --git a/ydb/core/wrappers/s3_storage.cpp b/ydb/core/wrappers/s3_storage.cpp
index 86cb5a86f21..55912060365 100644
--- a/ydb/core/wrappers/s3_storage.cpp
+++ b/ydb/core/wrappers/s3_storage.cpp
@@ -120,9 +120,9 @@ private:
public:
using TBase::Send;
using TBase::TBase;
- void Reply(const typename TBase::TRequest& request, const typename TBase::TOutcome& outcome) const {
+ void Reply(const typename TBase::TRequest& /*request*/, const typename TBase::TOutcome& outcome) const {
Y_VERIFY(!std::exchange(TBase::Replied, true), "Double-reply");
- Send(MakeHolder<TEvCheckObjectExistsResponse>(request, outcome, RequestContext).Release());
+ Send(MakeHolder<TEvCheckObjectExistsResponse>(outcome, RequestContext).Release());
}
};
@@ -224,7 +224,7 @@ public:
using TContextBase<TEvRequest, TEvResponse>::TContextBase;
const TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override {
- auto& request = ev->Get()->Request;
+ auto& request = ev->Get()->MutableRequest();
Buffer = std::move(ev->Get()->Body);
request.SetBody(MakeShared<DefaultUnderlyingStream>("StreamContext",
@@ -253,7 +253,7 @@ void TS3ExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const {
void TS3ExternalStorage::Execute(TEvCheckObjectExistsRequest::TPtr& ev) const {
Call<TEvCheckObjectExistsRequest, TEvCheckObjectExistsResponse, TContextBase>(
- ev, &S3Client::HeadObjectAsync);
+ ev, &S3Client::ListObjectsAsync);
}
void TS3ExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const {
diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h
index 11009c2171a..c5f239b1a42 100644
--- a/ydb/core/wrappers/s3_storage.h
+++ b/ydb/core/wrappers/s3_storage.h
@@ -32,6 +32,7 @@ private:
THolder<Aws::S3::S3Client> Client;
const Aws::Client::ClientConfiguration Config;
const Aws::Auth::AWSCredentials Credentials;
+ const TString Bucket;
template <typename TRequest, typename TOutcome>
using THandler = std::function<void(const Aws::S3::S3Client*, const TRequest&, const TOutcome&, const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)>;
@@ -42,6 +43,7 @@ private:
template <typename TEvRequest, typename TEvResponse, template <typename...> typename TContext>
void Call(typename TEvRequest::TPtr& ev, TFunc<typename TEvRequest::TRequest, typename TEvResponse::TOutcome> func) const {
using TCtx = TContext<TEvRequest, TEvResponse>;
+ ev->Get()->MutableRequest().WithBucket(Bucket);
auto ctx = std::make_shared<TCtx>(TlsActivationContext->ActorSystem(), ev->Sender, ev->Get()->GetRequestContext());
auto callback = [](
@@ -64,10 +66,12 @@ private:
}
public:
- TS3ExternalStorage(const Aws::Client::ClientConfiguration& config, const Aws::Auth::AWSCredentials& credentials)
+ TS3ExternalStorage(const Aws::Client::ClientConfiguration& config, const Aws::Auth::AWSCredentials& credentials, const TString& bucket)
: Client(new Aws::S3::S3Client(credentials, config))
, Config(config)
- , Credentials(credentials) {
+ , Credentials(credentials)
+ , Bucket(bucket)
+ {
}
~TS3ExternalStorage();
diff --git a/ydb/core/wrappers/s3_storage_config.cpp b/ydb/core/wrappers/s3_storage_config.cpp
index f0ce4964274..0032e45a519 100644
--- a/ydb/core/wrappers/s3_storage_config.cpp
+++ b/ydb/core/wrappers/s3_storage_config.cpp
@@ -183,8 +183,30 @@ TString TS3ExternalStorageConfig::DoGetStorageId() const {
}
IExternalStorageOperator::TPtr TS3ExternalStorageConfig::DoConstructStorageOperator() const {
- return std::make_shared<TS3ExternalStorage>(Config, Credentials);
+ return std::make_shared<TS3ExternalStorage>(Config, Credentials, Bucket);
}
+
+TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Ydb::Import::ImportFromS3Settings& settings): Config(ConfigFromSettings(settings))
+, Credentials(CredentialsFromSettings(settings))
+{
+ Bucket = settings.bucket();
+}
+
+TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Aws::Auth::AWSCredentials& credentials,
+ const Aws::Client::ClientConfiguration& config, const TString& bucket)
+ : Config(config)
+ , Credentials(credentials)
+{
+ Bucket = bucket;
+}
+
+TS3ExternalStorageConfig::TS3ExternalStorageConfig(const NKikimrSchemeOp::TS3Settings& settings)
+ : Config(ConfigFromSettings(settings))
+ , Credentials(CredentialsFromSettings(settings))
+{
+ Bucket = settings.GetBucket();
+}
+
}
#endif // KIKIMR_DISABLE_S3_OPS
diff --git a/ydb/core/wrappers/s3_storage_config.h b/ydb/core/wrappers/s3_storage_config.h
index 7ecefab799e..07fb87b9fb3 100644
--- a/ydb/core/wrappers/s3_storage_config.h
+++ b/ydb/core/wrappers/s3_storage_config.h
@@ -6,6 +6,7 @@
#include <ydb/core/base/events.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/library/accessor/accessor.h>
#include <ydb/public/api/protos/ydb_import.pb.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/auth/AWSCredentials.h>
@@ -24,6 +25,7 @@ struct TS3User {
class TS3ExternalStorageConfig: public IExternalStorageConfig, TS3User {
private:
+ YDB_READONLY_DEF(TString, Bucket);
Aws::Client::ClientConfiguration Config;
const Aws::Auth::AWSCredentials Credentials;
@@ -44,21 +46,9 @@ public:
return Config;
}
- TS3ExternalStorageConfig(const NKikimrSchemeOp::TS3Settings& settings)
- : Config(ConfigFromSettings(settings))
- , Credentials(CredentialsFromSettings(settings)) {
-
- }
- TS3ExternalStorageConfig(const Ydb::Import::ImportFromS3Settings& settings)
- : Config(ConfigFromSettings(settings))
- , Credentials(CredentialsFromSettings(settings)) {
-
- }
- TS3ExternalStorageConfig(const Aws::Auth::AWSCredentials& credentials, const Aws::Client::ClientConfiguration& config)
- : Config(config)
- , Credentials(credentials) {
-
- }
+ TS3ExternalStorageConfig(const NKikimrSchemeOp::TS3Settings& settings);
+ TS3ExternalStorageConfig(const Ydb::Import::ImportFromS3Settings& settings);
+ TS3ExternalStorageConfig(const Aws::Auth::AWSCredentials& credentials, const Aws::Client::ClientConfiguration& config, const TString& bucket);
};
} // NKikimr::NWrappers::NExternalStorage
diff --git a/ydb/core/wrappers/s3_wrapper.cpp b/ydb/core/wrappers/s3_wrapper.cpp
index 42474eb605f..84d7479dd63 100644
--- a/ydb/core/wrappers/s3_wrapper.cpp
+++ b/ydb/core/wrappers/s3_wrapper.cpp
@@ -23,47 +23,9 @@ namespace NKikimr::NWrappers {
namespace NExternalStorage {
class TS3Wrapper: public TActor<TS3Wrapper> {
- void Handle(TEvListObjectsRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvGetObjectRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvCheckObjectExistsRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvHeadObjectRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvPutObjectRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvDeleteObjectRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvDeleteObjectsRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvCreateMultipartUploadRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvUploadPartRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
-
- void Handle(TEvCompleteMultipartUploadRequest::TPtr& ev) {
- CSOperator->Execute(ev);
- }
- void Handle(TEvAbortMultipartUploadRequest::TPtr& ev) {
+ template <class T>
+ void Handle(T& ev) {
CSOperator->Execute(ev);
}
@@ -74,7 +36,9 @@ public:
explicit TS3Wrapper(IExternalStorageOperator::TPtr csOperator)
: TActor(&TThis::StateWork)
- , CSOperator(csOperator) {
+ , CSOperator(csOperator)
+ {
+ Y_VERIFY(!!CSOperator, "not initialized operator. incorrect config.");
}
virtual ~TS3Wrapper() = default;
diff --git a/ydb/core/wrappers/s3_wrapper_ut.cpp b/ydb/core/wrappers/s3_wrapper_ut.cpp
index 90724831c92..b864e183010 100644
--- a/ydb/core/wrappers/s3_wrapper_ut.cpp
+++ b/ydb/core/wrappers/s3_wrapper_ut.cpp
@@ -43,7 +43,7 @@ public:
Runtime = MakeHolder<TTestBasicRuntime>();
Runtime->Initialize(TAppPrepare().Unwrap());
Runtime->SetLogPriority(NKikimrServices::S3_WRAPPER, NLog::PRI_DEBUG);
- NWrappers::IExternalStorageConfig::TPtr config = std::make_shared<NExternalStorage::TS3ExternalStorageConfig>(Aws::Auth::AWSCredentials(), MakeClientConfig(*Port));
+ NWrappers::IExternalStorageConfig::TPtr config = std::make_shared<NExternalStorage::TS3ExternalStorageConfig>(Aws::Auth::AWSCredentials(), MakeClientConfig(*Port), "TEST");
Wrapper = Runtime->Register(NWrappers::CreateS3Wrapper(config->ConstructStorageOperator()));
}
diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.cpp b/ydb/services/metadata/ds_table/accessor_subscribe.cpp
index 93a41f8c481..6d01ae8ca2c 100644
--- a/ydb/services/metadata/ds_table/accessor_subscribe.cpp
+++ b/ydb/services/metadata/ds_table/accessor_subscribe.cpp
@@ -23,7 +23,7 @@ void TDSAccessorNotifier::Handle(TEvSubscribe::TPtr& ev) {
ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot";
return;
}
- Send(ev->Get()->GetSubscriberId(), new TEvRefreshSubscriberData(snapshot));
+ Sender<TEvRefreshSubscriberData>(snapshot).SendTo(ev->Get()->GetSubscriberId());
}
}
diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp
index d75d258450b..faa76a15878 100644
--- a/ydb/services/metadata/ds_table/service.cpp
+++ b/ydb/services/metadata/ds_table/service.cpp
@@ -20,7 +20,11 @@ void TService::Handle(TEvSubscribeExternal::TPtr& ev) {
THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser());
it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetSnapshotId(), Register(actor.Release())).first;
}
- Send<TEvSubscribe>(it->second, ev->Sender);
+ if (!!ev->Get()->GetSubscriberId()) {
+ Send<TEvSubscribe>(it->second, ev->Get()->GetSubscriberId());
+ } else {
+ Send<TEvSubscribe>(it->second, ev->Sender);
+ }
}
void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) {
diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h
index 72800a06d21..888ec5763b4 100644
--- a/ydb/services/metadata/service.h
+++ b/ydb/services/metadata/service.h
@@ -7,9 +7,12 @@ namespace NKikimr::NMetadataProvider {
class TEvSubscribeExternal: public NActors::TEventLocal<TEvSubscribeExternal, EEvSubscribe::EvSubscribeExternal> {
private:
YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser);
+ YDB_READONLY_DEF(NActors::TActorId, SubscriberId);
public:
- TEvSubscribeExternal(ISnapshotParser::TPtr parser)
- : SnapshotParser(parser) {
+ TEvSubscribeExternal(ISnapshotParser::TPtr parser, const NActors::TActorId& subscriberId = {})
+ : SnapshotParser(parser)
+ , SubscriberId(subscriberId)
+ {
Y_VERIFY(!!SnapshotParser);
}
};