aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@ydb.tech>2025-03-31 21:26:02 +0300
committerGitHub <noreply@github.com>2025-03-31 21:26:02 +0300
commit4ae39ac0db24e7564f13a3d0545a540bd647283c (patch)
tree7eb2cb7c2312f2910191011e0c0baa0a1c38e95b
parentba56e6037c80d57513c9acc90f517e7c0e44ce4c (diff)
downloadydb-4ae39ac0db24e7564f13a3d0545a540bd647283c.tar.gz
Implement encrypted import (#16434)
-rw-r--r--ydb/core/backup/common/metadata.cpp44
-rw-r--r--ydb/core/backup/common/metadata.h17
-rw-r--r--ydb/core/protos/flat_scheme_op.proto9
-rw-r--r--ydb/core/tx/datashard/extstorage_usage_config.h45
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp21
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export__create.cpp17
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import.cpp28
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import__create.cpp246
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp14
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp550
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp25
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h39
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_private.h13
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h6
-rw-r--r--ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp70
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema14
22 files changed, 931 insertions, 242 deletions
diff --git a/ydb/core/backup/common/metadata.cpp b/ydb/core/backup/common/metadata.cpp
index fd70983aca..d54b879861 100644
--- a/ydb/core/backup/common/metadata.cpp
+++ b/ydb/core/backup/common/metadata.cpp
@@ -53,4 +53,48 @@ TMetadata TMetadata::Deserialize(const TString& metadata) {
return result;
}
+bool TSchemaMapping::Deserialize(const TString& jsonContent, TString& error) {
+ NJson::TJsonValue json;
+ if (!NJson::ReadJsonTree(jsonContent, &json)) {
+ error = "Failed to parse schema mapping json";
+ return false;
+ }
+ const NJson::TJsonValue& mapping = json["exportedObjects"];
+ if (!mapping.IsMap()) {
+ error = "No mapping in mapping json";
+ return false;
+ }
+
+ bool hasIV = false;
+ bool first = true;
+ for (const auto& [exportObject, info] : mapping.GetMap()) {
+ const NJson::TJsonValue& exportPrefix = info["exportPrefix"];
+ const NJson::TJsonValue& iv = info["iv"];
+ if (!exportPrefix.IsString()) {
+ error = "Incorrect exportPrefix";
+ return false;
+ }
+ if (first) {
+ hasIV = info.Has("iv");
+ } else {
+ first = false;
+ if (hasIV != info.Has("iv")) {
+ error = "Incorrect iv in schema mapping json";
+ return false;
+ }
+ }
+ if (hasIV && !iv.IsString()) {
+ error = "IV in schema mapping json is not a string";
+ return false;
+ }
+ TItem& item = Items.emplace_back();
+ item.ExportPrefix = exportPrefix.GetString();
+ item.ObjectPath = exportObject;
+ if (hasIV) {
+ item.IV = TEncryptionIV::FromHexString(iv.GetString());
+ }
+ }
+ return true;
+}
+
}
diff --git a/ydb/core/backup/common/metadata.h b/ydb/core/backup/common/metadata.h
index f92e5e4fd6..7479f7ee41 100644
--- a/ydb/core/backup/common/metadata.h
+++ b/ydb/core/backup/common/metadata.h
@@ -1,4 +1,5 @@
#pragma once
+#include "encryption.h"
#include <ydb/core/base/row_version.h>
@@ -56,4 +57,20 @@ private:
TMaybeFail<ui64> Version;
};
+class TSchemaMapping {
+public:
+ struct TItem {
+ TString ExportPrefix;
+ TString ObjectPath;
+ TMaybe<NBackup::TEncryptionIV> IV;
+ };
+
+ TSchemaMapping() = default;
+
+ bool Deserialize(const TString& jsonContent, TString& error);
+
+public:
+ std::vector<TItem> Items;
+};
+
} // namespace NKikimr::NBackup
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 9d0f3fdfbe..e048e9051a 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1268,6 +1268,15 @@ message TRestoreTask {
}
optional bool ValidateChecksums = 7; // currently available for s3
+
+ message TEncryptionSettings {
+ optional bytes IV = 1;
+ oneof Key {
+ Ydb.Export.EncryptionSettings.SymmetricKey SymmetricKey = 2;
+ }
+ }
+
+ optional TEncryptionSettings EncryptionSettings = 8;
}
message TPersQueueGroupAllocate {
diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h
index 1dc2c6afed..bd84fd91ef 100644
--- a/ydb/core/tx/datashard/extstorage_usage_config.h
+++ b/ydb/core/tx/datashard/extstorage_usage_config.h
@@ -25,21 +25,11 @@ public:
const TMaybe<NBackup::TEncryptionIV> IV;
static TEncryptionSettings FromBackupTask(const NKikimrSchemeOp::TBackupTask& task) {
- if (task.HasEncryptionSettings()) {
- return TEncryptionSettings{
- .EncryptedBackup = true,
- .EncryptionAlgorithm = task.GetEncryptionSettings().GetEncryptionAlgorithm(),
- .Key = NBackup::TEncryptionKey(task.GetEncryptionSettings().GetSymmetricKey().key()),
- .IV = NBackup::TEncryptionIV::FromBinaryString(task.GetEncryptionSettings().GetIV()),
- };
- } else {
- return TEncryptionSettings{
- .EncryptedBackup = false,
- .EncryptionAlgorithm = {},
- .Key = Nothing(),
- .IV = Nothing(),
- };
- }
+ return FromProto(task);
+ }
+
+ static TEncryptionSettings FromRestoreTask(const NKikimrSchemeOp::TRestoreTask& task) {
+ return FromProto(task);
}
TMaybe<NBackup::TEncryptionIV> GetMetadataIV() const {
@@ -70,6 +60,29 @@ public:
}
return iv;
}
+
+ template <class TProto>
+ static TEncryptionSettings FromProto(const TProto& task) {
+ if (task.HasEncryptionSettings()) {
+ TString algorithm;
+ if constexpr (std::is_same_v<TProto, NKikimrSchemeOp::TBackupTask>) {
+ algorithm = task.GetEncryptionSettings().GetEncryptionAlgorithm();
+ }
+ return TEncryptionSettings{
+ .EncryptedBackup = true,
+ .EncryptionAlgorithm = algorithm,
+ .Key = NBackup::TEncryptionKey(task.GetEncryptionSettings().GetSymmetricKey().key()),
+ .IV = NBackup::TEncryptionIV::FromBinaryString(task.GetEncryptionSettings().GetIV()),
+ };
+ } else {
+ return TEncryptionSettings{
+ .EncryptedBackup = false,
+ .EncryptionAlgorithm = {},
+ .Key = Nothing(),
+ .IV = Nothing(),
+ };
+ }
+ }
};
public:
const TString Bucket;
@@ -93,7 +106,7 @@ public:
}
static TS3Settings FromRestoreTask(const NKikimrSchemeOp::TRestoreTask& task) {
- return TS3Settings(task.GetS3Settings(), task.GetShardNum(), TEncryptionSettings{});
+ return TS3Settings(task.GetS3Settings(), task.GetShardNum(), TEncryptionSettings::FromRestoreTask(task));
}
inline const TString& GetBucket() const { return Bucket; }
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index aa90e9da6f..035e5230ba 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -488,17 +488,34 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return HeadObject(Settings.GetDataKey(DataFormat, CompressionCodec));
}
+ THolder<IReadController> reader;
switch (CompressionCodec) {
case NBackupRestoreTraits::ECompressionCodec::None:
- Reader.Reset(new TReadControllerRaw(ReadBatchSize, ReadBufferSizeLimit));
+ reader.Reset(new TReadControllerRaw(ReadBatchSize, ReadBufferSizeLimit));
break;
case NBackupRestoreTraits::ECompressionCodec::Zstd:
- Reader.Reset(new TReadControllerZstd(ReadBatchSize, ReadBufferSizeLimit));
+ reader.Reset(new TReadControllerZstd(ReadBatchSize, ReadBufferSizeLimit));
break;
case NBackupRestoreTraits::ECompressionCodec::Invalid:
Y_ENSURE(false, "unreachable");
}
+ if (Settings.EncryptionSettings.EncryptedBackup) {
+ NBackup::TEncryptionIV expectedIV = NBackup::TEncryptionIV::Combine(
+ *Settings.EncryptionSettings.IV,
+ NBackup::EBackupFileType::TableData,
+ 0 /* already combined */,
+ Settings.Shard
+ );
+ Reader = MakeHolder<TEncryptionDeserializerController>(
+ *Settings.EncryptionSettings.Key,
+ expectedIV,
+ std::move(reader)
+ );
+ } else {
+ Reader = std::move(reader);
+ }
+
ETag = result.GetResult().GetETag();
ContentLength = result.GetResult().GetContentLength();
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index c97a392a51..1b346d40ee 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -4431,6 +4431,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
switch (importInfo->State) {
+ case TImportInfo::EState::DownloadExportMetadata:
case TImportInfo::EState::Waiting:
case TImportInfo::EState::Cancellation:
ImportsToResume.push_back(importInfo->Id);
@@ -4507,6 +4508,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);
item.NextChangefeedIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextChangefeedIdx>(0);
item.Issue = rowset.GetValueOrDefault<Schema::ImportItems::Issue>(TString());
+ item.SrcPrefix = rowset.GetValueOrDefault<Schema::ImportItems::SrcPrefix>(TString());
+ if (rowset.HaveValue<Schema::ImportItems::EncryptionIV>()) {
+ item.ExportItemIV = NBackup::TEncryptionIV::FromBinaryString(rowset.GetValue<Schema::ImportItems::EncryptionIV>());
+ }
if (item.WaitTxId != InvalidTxId) {
Self->TxIdToImport[item.WaitTxId] = {importId, itemIdx};
diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
index 099c564a3e..ea97b3310e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
@@ -209,12 +209,25 @@ private:
return true;
}
+ TString GetCommonSourcePath(const Ydb::Export::ExportToS3Settings& settings) {
+ return settings.source_path();
+ }
+
+ TString GetCommonSourcePath(const Ydb::Export::ExportToYtSettings&) {
+ return {};
+ }
+
template <typename TSettings>
bool FillItems(TExportInfo::TPtr exportInfo, const TSettings& settings, TString& explain) {
+ TString commonSourcePath = GetCommonSourcePath(settings);
+ if (commonSourcePath && commonSourcePath.back() != '/') {
+ commonSourcePath.push_back('/');
+ }
exportInfo->Items.reserve(settings.items().size());
for (ui32 itemIdx : xrange(settings.items().size())) {
const auto& item = settings.items(itemIdx);
- const TPath path = TPath::Resolve(item.source_path(), Self);
+ const TString srcPath = commonSourcePath + item.source_path();
+ const TPath path = TPath::Resolve(srcPath, Self);
{
TPath::TChecker checks = path.Check();
checks
@@ -230,7 +243,7 @@ private:
}
}
- exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId, path->PathType);
+ exportInfo->Items.emplace_back(srcPath, path.Base()->PathId, path->PathType);
exportInfo->PendingItems.push_back(itemIdx);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp b/ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp
index e8f81a6946..2851f91ffb 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp
@@ -477,7 +477,6 @@ private:
}
writer.CloseMap();
}
- writer.Write("exportedObjects", "SchemaMappingV0");
writer.CloseMap();
writer.CloseMap();
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index f0b26158dd..518bcbbf79 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -4949,6 +4949,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvImport::TEvForgetImportRequest, Handle);
HFuncTraced(TEvImport::TEvListImportsRequest, Handle);
HFuncTraced(TEvPrivate::TEvImportSchemeReady, Handle);
+ HFuncTraced(TEvPrivate::TEvImportSchemaMappingReady, Handle);
HFuncTraced(TEvPrivate::TEvImportSchemeQueryResult, Handle);
// } // NImport
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 6f3ea3f723..9b4a405f96 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -1305,6 +1305,7 @@ public:
void FromXxportInfo(NKikimrImport::TImport& exprt, const TImportInfo::TPtr importInfo);
static void PersistCreateImport(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo);
+ static void PersistSchemaMappingImportFields(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo);
static void PersistRemoveImport(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo);
static void PersistImportState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo);
static void PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);
@@ -1333,6 +1334,7 @@ public:
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(ui64 id, const TMaybe<ui32>& itemIdx = Nothing());
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeReady::TPtr& ev);
+ NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev);
@@ -1345,6 +1347,7 @@ public:
void Handle(TEvImport::TEvForgetImportRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvImport::TEvListImportsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx);
void ResumeImports(const TVector<ui64>& ids, const TActorContext& ctx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_import.cpp b/ydb/core/tx/schemeshard/schemeshard_import.cpp
index 05a002eee5..2ccd2d7572 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import.cpp
@@ -60,6 +60,9 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI
}
switch (importInfo->State) {
+ case TImportInfo::EState::DownloadExportMetadata:
+ import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_PREPARING);
+ break;
case TImportInfo::EState::Waiting:
switch (GetMinState(importInfo)) {
case TImportInfo::EState::GetScheme:
@@ -136,8 +139,27 @@ void TSchemeShard::PersistCreateImport(NIceDb::TNiceDb& db, const TImportInfo::T
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::DstPathName>(item.DstPathName),
- NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State))
+ NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State)),
+ NIceDb::TUpdate<Schema::ImportItems::SrcPrefix>(item.SrcPrefix)
+ );
+ }
+}
+
+void TSchemeShard::PersistSchemaMappingImportFields(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo) {
+ // There can be new items, so do at least the same as for creation
+ for (ui32 itemIdx : xrange(importInfo->Items.size())) {
+ const auto& item = importInfo->Items.at(itemIdx);
+
+ db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
+ NIceDb::TUpdate<Schema::ImportItems::DstPathName>(item.DstPathName),
+ NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State)),
+ NIceDb::TUpdate<Schema::ImportItems::SrcPrefix>(item.SrcPrefix)
);
+ if (item.ExportItemIV) {
+ db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
+ NIceDb::TUpdate<Schema::ImportItems::EncryptionIV>(item.ExportItemIV->GetBinaryString())
+ );
+ }
}
}
@@ -244,6 +266,10 @@ void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TAct
Execute(CreateTxProgressImport(ev), ctx);
}
+void TSchemeShard::Handle(TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev, const TActorContext& ctx) {
+ Execute(CreateTxProgressImport(ev), ctx);
+}
+
void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxProgressImport(ev), ctx);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp
index d494c514c3..efed43d23d 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp
@@ -54,6 +54,7 @@ struct TSchemeShard::TImport::TTxCancel: public TSchemeShard::TXxport::TTxBase {
case TImportInfo::EState::Cancelled:
return respond(Ydb::StatusIds::SUCCESS);
+ case TImportInfo::EState::DownloadExportMetadata:
case TImportInfo::EState::Waiting:
case TImportInfo::EState::Cancellation:
importInfo->Issue = "Cancelled manually";
diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
index 98764f8f9b..5a0d4b9ddd 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
@@ -67,6 +67,42 @@ TString GetDatabase(TSchemeShard& ss) {
return CanonizePath(ss.RootPathElements);
}
+bool ValidateDstPath(const TString& dstPath, TSchemeShard* ss, TString& explain) {
+ const TPath path = TPath::Resolve(dstPath, ss);
+ TPath::TChecker checks = path.Check();
+ checks
+ .IsAtLocalSchemeShard()
+ .HasResolvedPrefix()
+ .FailOnRestrictedCreateInTempZone();
+
+ if (path.IsResolved()) {
+ checks
+ .IsResolved()
+ .IsDeleted();
+ } else {
+ checks
+ .NotEmpty()
+ .NotResolved();
+ }
+
+ if (checks) {
+ checks
+ .IsValidLeafName()
+ .DepthLimit()
+ .PathsLimit();
+
+ if (path.Parent().IsResolved()) {
+ checks.DirChildrenLimit();
+ }
+ }
+
+ if (!checks) {
+ explain = checks.GetError();
+ return false;
+ }
+ return true;
+}
+
}
struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
@@ -141,6 +177,7 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}
TImportInfo::TPtr importInfo = nullptr;
+ TImportInfo::EState initialState = TImportInfo::EState::Waiting;
switch (request.GetRequest().GetSettingsCase()) {
case NKikimrImport::TCreateImportRequest::kImportFromS3Settings:
@@ -150,6 +187,10 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
settings.set_scheme(Ydb::Import::ImportFromS3Settings::HTTPS);
}
+ if (!settings.source_prefix().empty() && AppData()->FeatureFlags.GetEnableEncryptedExport()) {
+ initialState = TImportInfo::EState::DownloadExportMetadata;
+ }
+
importInfo = new TImportInfo(id, uid, TImportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName());
if (request.HasUserSID()) {
@@ -172,7 +213,7 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
NIceDb::TNiceDb db(txc.DB);
Self->PersistCreateImport(db, importInfo);
- importInfo->State = TImportInfo::EState::Waiting;
+ importInfo->State = initialState;
importInfo->StartTime = TAppData::TimeProvider->Now();
Self->PersistImportState(db, importInfo);
@@ -232,42 +273,12 @@ private:
return false;
}
- const TPath path = TPath::Resolve(dstPath, Self);
- {
- TPath::TChecker checks = path.Check();
- checks
- .IsAtLocalSchemeShard()
- .HasResolvedPrefix()
- .FailOnRestrictedCreateInTempZone();
-
- if (path.IsResolved()) {
- checks
- .IsResolved()
- .IsDeleted();
- } else {
- checks
- .NotEmpty()
- .NotResolved();
- }
-
- if (checks) {
- checks
- .IsValidLeafName()
- .DepthLimit()
- .PathsLimit();
-
- if (path.Parent().IsResolved()) {
- checks.DirChildrenLimit();
- }
- }
-
- if (!checks) {
- explain = checks.GetError();
- return false;
- }
+ if (!ValidateDstPath(dstPath, Self, explain)) {
+ return false;
}
- importInfo->Items.emplace_back(dstPath);
+ auto& item = importInfo->Items.emplace_back(dstPath);
+ item.SrcPrefix = settings.items(itemIdx).source_prefix();
}
return true;
@@ -284,6 +295,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
ui64 Id;
TMaybe<ui32> ItemIdx;
TEvPrivate::TEvImportSchemeReady::TPtr SchemeResult = nullptr;
+ TEvPrivate::TEvImportSchemaMappingReady::TPtr SchemaMappingResult = nullptr;
TEvPrivate::TEvImportSchemeQueryResult::TPtr SchemeQueryResult = nullptr;
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr;
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr;
@@ -303,6 +315,13 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
{
}
+ explicit TTxProgress(TSelf* self, TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev)
+ : TXxport::TTxBase(self)
+ , Id(ev->Get()->ImportId)
+ , SchemaMappingResult(ev)
+ {
+ }
+
explicit TTxProgress(TSelf* self, TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev)
: TXxport::TTxBase(self)
, SchemeQueryResult(ev)
@@ -342,6 +361,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
if (SchemeResult) {
OnSchemeResult(txc, ctx);
+ } else if (SchemaMappingResult) {
+ OnSchemaMappingResult(txc, ctx);
} else if (SchemeQueryResult) {
OnSchemeQueryPreparation(txc, ctx);
} else if (AllocateResult) {
@@ -372,10 +393,18 @@ private:
<< ": info# " << importInfo->ToString()
<< ", item# " << item.ToString(itemIdx));
- item.SchemeGetter = ctx.RegisterWithSameMailbox(CreateSchemeGetter(Self->SelfId(), importInfo, itemIdx));
+ item.SchemeGetter = ctx.RegisterWithSameMailbox(CreateSchemeGetter(Self->SelfId(), importInfo, itemIdx, item.ExportItemIV));
Self->RunningImportSchemeGetters.emplace(item.SchemeGetter);
}
+ void GetSchemaMapping(TImportInfo::TPtr importInfo, const TActorContext& ctx) {
+ LOG_I("TImport::TTxProgress: Download schema mapping"
+ << ": info# " << importInfo->ToString());
+
+ importInfo->SchemaMappingGetter = ctx.RegisterWithSameMailbox(CreateSchemaMappingGetter(Self->SelfId(), importInfo));
+ Self->RunningImportSchemeGetters.emplace(importInfo->SchemaMappingGetter);
+ }
+
void CreateTable(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
@@ -685,15 +714,27 @@ private:
}
void Cancel(TImportInfo::TPtr importInfo, ui32 itemIdx, TStringBuf marker) {
- Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
- const auto& item = importInfo->Items.at(itemIdx);
+ const TItem* item = nullptr;
+ if (itemIdx != ui32(-1)) {
+ Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
+ item = &importInfo->Items.at(itemIdx);
+ }
+ TStringBuilder itemLogStr;
+ if (item) {
+ itemLogStr << ", item# " << item->ToString(itemIdx);
+ }
LOG_N("TImport::TTxProgress: " << marker << ", cancelling"
<< ", info# " << importInfo->ToString()
- << ", item# " << item.ToString(itemIdx));
+ << itemLogStr);
importInfo->State = EState::Cancelled;
+ if (auto schemaMappingGetter = std::exchange(importInfo->SchemaMappingGetter, {})) {
+ Send(schemaMappingGetter, new TEvents::TEvPoisonPill());
+ Self->RunningImportSchemeGetters.erase(schemaMappingGetter);
+ }
+
for (ui32 i : xrange(importInfo->Items.size())) {
KillChildActors(importInfo->Items[i]);
if (i == itemIdx) {
@@ -720,14 +761,16 @@ private:
}
void CancelAndPersist(NIceDb::TNiceDb& db, TImportInfo::TPtr importInfo, ui32 itemIdx, TStringBuf itemIssue, TStringBuf marker) {
- Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
- auto& item = importInfo->Items[itemIdx];
+ if (itemIdx != ui32(-1)) {
+ Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
+ auto& item = importInfo->Items[itemIdx];
- item.Issue = itemIssue;
- PersistImportItemState(db, importInfo, itemIdx);
+ item.Issue = itemIssue;
+ PersistImportItemState(db, importInfo, itemIdx);
- if (importInfo->State != EState::Waiting) {
- return;
+ if (importInfo->State != EState::Waiting) {
+ return;
+ }
}
Cancel(importInfo, itemIdx, marker);
@@ -801,11 +844,20 @@ private:
<< ": id# " << Id
<< ", itemIdx# " << ItemIdx);
- if (ItemIdx) {
- Resume(importInfo, *ItemIdx, txc, ctx);
- } else {
- for (ui32 itemIdx : xrange(importInfo->Items.size())) {
- Resume(importInfo, itemIdx, txc, ctx);
+ switch (importInfo->State) {
+ case EState::DownloadExportMetadata: {
+ GetSchemaMapping(importInfo, ctx);
+ break;
+ }
+ default: {
+ if (ItemIdx) {
+ Resume(importInfo, *ItemIdx, txc, ctx);
+ } else {
+ for (ui32 itemIdx : xrange(importInfo->Items.size())) {
+ Resume(importInfo, itemIdx, txc, ctx);
+ }
+ }
+ break;
}
}
}
@@ -943,7 +995,7 @@ private:
// Send the creation query to KQP to prepare.
const auto database = GetDatabase(*Self);
const TString source = TStringBuilder()
- << importInfo->Settings.items(msg.ItemIdx).source_prefix() << NYdb::NDump::NFiles::CreateView().FileName;
+ << importInfo->GetItemSrcPrefix(msg.ItemIdx) << NYdb::NDump::NFiles::CreateView().FileName;
NYql::TIssues issues;
if (!NYdb::NDump::RewriteCreateViewQuery(item.CreationQuery, database, true, item.DstPathName, issues)) {
@@ -965,6 +1017,94 @@ private:
}
}
+ void OnSchemaMappingResult(TTransactionContext& txc, const TActorContext& ctx) {
+ Y_ABORT_UNLESS(SchemaMappingResult);
+
+ const auto& msg = *SchemaMappingResult->Get();
+
+ LOG_D("TImport::TTxProgress: OnSchemaMappingResult"
+ << ": id# " << msg.ImportId
+ << ", success# " << msg.Success
+ );
+
+ if (!Self->Imports.contains(msg.ImportId)) {
+ LOG_E("TImport::TTxProgress: OnSchemaMappingResult received unknown id"
+ << ": id# " << msg.ImportId);
+ return;
+ }
+
+ TImportInfo::TPtr importInfo = Self->Imports.at(msg.ImportId);
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ Self->RunningImportSchemeGetters.erase(std::exchange(importInfo->SchemaMappingGetter, {}));
+
+ if (!msg.Success) {
+ return CancelAndPersist(db, importInfo, -1, {}, TStringBuilder() << "cannot get schema mapping: " << msg.Error);
+ }
+
+ if (!importInfo->SchemaMapping->Items.empty()) {
+ if (importInfo->Settings.has_encryption_settings() != importInfo->SchemaMapping->Items[0].IV.Defined()) {
+ return CancelAndPersist(db, importInfo, -1, {}, "incorrect schema mapping");
+ }
+ }
+
+ // Path in database for import
+ TVector<TString> dstPath;
+ if (importInfo->Settings.destination_path().empty()) {
+ dstPath = Self->RootPathElements;
+ } else {
+ dstPath = SplitPath(importInfo->Settings.destination_path());
+ }
+ TString sourcePrefix = importInfo->Settings.source_prefix();
+ if (sourcePrefix && sourcePrefix.back() != '/') {
+ sourcePrefix.push_back('/');
+ }
+ auto combineDstPath = [&](const TString& mappingObjectPath) {
+ TVector<TString> objectPath = SplitPath(mappingObjectPath);
+ TVector<TString> dstObjectPath = dstPath;
+ dstObjectPath.insert(dstObjectPath.end(), objectPath.begin(), objectPath.end());
+ return CombinePath(dstObjectPath.begin(), dstObjectPath.end());
+ };
+ auto init = [&](const NBackup::TSchemaMapping::TItem& schemaMappingItem, NSchemeShard::TImportInfo::TItem& item) {
+ TStringBuf exportPrefix(schemaMappingItem.ExportPrefix);
+ exportPrefix.SkipPrefix("/");
+ item.SrcPrefix = TStringBuilder() << sourcePrefix << exportPrefix;
+ item.ExportItemIV = schemaMappingItem.IV;
+ };
+ if (importInfo->Items.empty()) { // Fill the whole list from schema mapping
+ for (const auto& schemaMappingItem : importInfo->SchemaMapping->Items) {
+ TString dstPath = combineDstPath(schemaMappingItem.ObjectPath);
+ TString explain;
+ if (!ValidateDstPath(dstPath, Self, explain)) {
+ return CancelAndPersist(db, importInfo, -1, {}, TStringBuilder() << "cannot validate mapping: " << explain);
+ }
+
+ auto& item = importInfo->Items.emplace_back(dstPath);
+ init(schemaMappingItem, item);
+ }
+ } else { // Take existing items from items list
+ THashMap<TString, size_t> schemaMappingIndex;
+ for (size_t i = 0; i < importInfo->SchemaMapping->Items.size(); ++i) {
+ schemaMappingIndex[combineDstPath(importInfo->SchemaMapping->Items[i].ObjectPath)] = i;
+ }
+ for (auto& item : importInfo->Items) {
+ TString dstPath = CanonizePath(item.DstPathName);
+ auto mappingIt = schemaMappingIndex.find(dstPath);
+ if (mappingIt == schemaMappingIndex.end()) {
+ return CancelAndPersist(db, importInfo, -1, {}, TStringBuilder() << "cannot find path " << dstPath << " in schema mapping");
+ }
+ const auto& schemaMappingItem = importInfo->SchemaMapping->Items[mappingIt->second];
+ init(schemaMappingItem, item);
+ }
+ }
+
+ importInfo->State = EState::Waiting;
+ PersistImportState(db, importInfo);
+ PersistSchemaMappingImportFields(db, importInfo);
+ Resume(txc, ctx);
+ }
+
void OnSchemeQueryPreparation(TTransactionContext& txc, const TActorContext& ctx) {
Y_ABORT_UNLESS(SchemeQueryResult);
const auto& message = *SchemeQueryResult.Get()->Get();
@@ -1408,6 +1548,10 @@ ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeRe
return new TImport::TTxProgress(this, ev);
}
+ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev) {
+ return new TImport::TTxProgress(this, ev);
+}
+
ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) {
return new TImport::TTxProgress(this, ev);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
index 01984722d3..745df56387 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
@@ -144,6 +144,14 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> RestorePropose(
task.SetTableName(dstPath.LeafName());
*task.MutableTableDescription() = RebuildTableDescription(GetTableDescription(ss, item.DstPathId), item.Scheme);
+ if (importInfo->Settings.has_encryption_settings()) {
+ auto& taskEncryptionSettings = *task.MutableEncryptionSettings();
+ *taskEncryptionSettings.MutableSymmetricKey() = importInfo->Settings.encryption_settings().symmetric_key();
+ if (item.ExportItemIV) {
+ taskEncryptionSettings.SetIV(item.ExportItemIV->GetBinaryString());
+ }
+ }
+
switch (importInfo->Kind) {
case TImportInfo::EKind::S3:
{
@@ -153,7 +161,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> RestorePropose(
restoreSettings.SetBucket(importInfo->Settings.bucket());
restoreSettings.SetAccessKey(importInfo->Settings.access_key());
restoreSettings.SetSecretKey(importInfo->Settings.secret_key());
- restoreSettings.SetObjectKeyPattern(importInfo->Settings.items(itemIdx).source_prefix());
+ restoreSettings.SetObjectKeyPattern(importInfo->GetItemSrcPrefix(itemIdx));
restoreSettings.SetUseVirtualAddressing(!importInfo->Settings.disable_virtual_addressing());
switch (importInfo->Settings.scheme()) {
@@ -338,7 +346,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
auto* tabletConfig = pqGroup.MutablePQTabletConfig();
const auto& pqConfig = AppData()->PQConfig;
-
+
for (const auto& consumer : topic.consumers()) {
auto& addedConsumer = *tabletConfig->AddConsumers();
auto consumerName = NPersQueue::ConvertNewConsumerName(consumer.name(), pqConfig);
@@ -347,7 +355,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
addedConsumer.SetImportant(true);
}
}
-
+
return propose;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
index e09e9efadc..139e3c8dfc 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
@@ -3,6 +3,7 @@
#include "schemeshard_private.h"
#include <ydb/core/backup/common/checksum.h>
+#include <ydb/core/backup/common/encryption.h>
#include <ydb/core/backup/common/metadata.h>
#include <ydb/core/wrappers/s3_storage_config.h>
#include <ydb/core/wrappers/s3_wrapper.h>
@@ -12,6 +13,8 @@
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/public/lib/ydb_cli/dump/files/files.h>
+#include <library/cpp/json/json_reader.h>
+
#include <google/protobuf/text_format.h>
#include <util/string/subst.h>
@@ -26,31 +29,173 @@ using namespace Aws::Client;
using namespace Aws::S3;
using namespace Aws;
+static constexpr TDuration MaxDelay = TDuration::Minutes(10);
+
+template <class TDerived>
+class TGetterFromS3 : public TActorBootstrapped<TDerived> {
+protected:
+ explicit TGetterFromS3(TImportInfo::TPtr importInfo, TMaybe<NBackup::TEncryptionIV> iv)
+ : ImportInfo(std::move(importInfo))
+ , ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(ImportInfo->Settings))
+ , IV(std::move(iv))
+ , Retries(ImportInfo->Settings.number_of_retries())
+ {
+ if (ImportInfo->Settings.has_encryption_settings()) {
+ Key = NBackup::TEncryptionKey(ImportInfo->Settings.encryption_settings().symmetric_key().key());
+ }
+ }
+
+ void HeadObject(const TString& key, bool autoAddEncSuffix = true) {
+ auto request = Model::HeadObjectRequest()
+ .WithKey(GetKey(key, autoAddEncSuffix));
+
+ this->Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request));
+ }
+
+ void GetObject(const TString& key, const std::pair<ui64, ui64>& range, bool autoAddEncSuffix = true) {
+ auto request = Model::GetObjectRequest()
+ .WithKey(GetKey(key, autoAddEncSuffix))
+ .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second);
+
+ this->Send(Client, new TEvExternalStorage::TEvGetObjectRequest(request));
+ }
+
+ void GetObject(const TString& key, ui64 contentLength, bool autoAddEncSuffix = true) {
+ GetObject(key, std::make_pair(0, contentLength - 1), autoAddEncSuffix);
+ }
+
+ void ListObjects(const TString& prefix) {
+ auto request = Model::ListObjectsRequest()
+ .WithPrefix(prefix);
+
+ this->Send(Client, new TEvExternalStorage::TEvListObjectsRequest(request));
+ }
+
+ void Download(const TString& key, bool autoAddEncSuffix = true) {
+ CreateClient();
+ HeadObject(key, autoAddEncSuffix);
+ }
+
+ void CreateClient() {
+ if (Client) {
+ this->Send(Client, new TEvents::TEvPoisonPill());
+ }
+ Client = this->RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
+ }
+
+ void PassAway() override {
+ this->Send(Client, new TEvents::TEvPoisonPill());
+ TActorBootstrapped<TDerived>::PassAway();
+ }
+
+ TString GetKey(TString key, bool autoAddEncSuffix = true) {
+ if (autoAddEncSuffix && ImportInfo->Settings.has_encryption_settings()) {
+ key += ".enc";
+ }
+ return key;
+ }
+
+ template <typename TResult>
+ bool CheckResult(const TResult& result, const TStringBuf marker) {
+ if (result.IsSuccess()) {
+ return true;
+ }
+
+ LOG_E("Error at '" << marker << "'"
+ << ": self# " << this->SelfId()
+ << ", error# " << result);
+ MaybeRetry(result.GetError());
+
+ return false;
+ }
+
+ void MaybeRetry(const Aws::S3::S3Error& error) {
+ if (Attempt < Retries && error.ShouldRetry()) {
+ Delay = Min(Delay * ++Attempt, MaxDelay);
+ this->Schedule(Delay, new TEvents::TEvWakeup());
+ } else {
+ Reply(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str());
+ }
+ }
+
+ void ResetRetries() {
+ Attempt = 0;
+ }
+
+ // If export is encrypted, decrypts and gets export IV,
+ // else returns true
+ bool MaybeDecryptAndSaveIV(const TString& content, TString& result) {
+ if (Key) {
+ try {
+ auto [buffer, iv] = NBackup::TEncryptedFileDeserializer::DecryptFullFile(*Key, TBuffer(content.data(), content.size()));
+ IV = iv;
+ result.assign(buffer.Data(), buffer.Size());
+ return true;
+ } catch (const std::exception& ex) {
+ Reply(false, ex.what());
+ return false;
+ }
+ }
+ result = content;
+ return true;
+ }
+
+ bool MaybeDecrypt(const TString& content, TString& result, NBackup::EBackupFileType fileType) {
+ if (Key && IV) {
+ try {
+ NBackup::TEncryptionIV expectedIV = NBackup::TEncryptionIV::Combine(*IV, fileType, 0 /* already combined */, 0);
+ auto buffer = NBackup::TEncryptedFileDeserializer::DecryptFullFile(*Key, expectedIV, TBuffer(content.data(), content.size()));
+ result.assign(buffer.Data(), buffer.Size());
+ return true;
+ } catch (const std::exception& ex) {
+ Reply(false, ex.what());
+ return false;
+ }
+ }
+ result = content;
+ return true;
+ }
+
+ virtual void Reply(bool success = true, const TString& error = TString()) = 0;
+
+protected:
+ TImportInfo::TPtr ImportInfo;
+ NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
+ TActorId Client;
+ TMaybe<NBackup::TEncryptionKey> Key;
+ TMaybe<NBackup::TEncryptionIV> IV;
+
+ const ui32 Retries;
+ ui32 Attempt = 0;
+
+ TDuration Delay = TDuration::Minutes(1);
+};
+
// Downloads scheme-related objects from S3
-class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
- static TString MetadataKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
- Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
- return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/metadata.json";
+class TSchemeGetter: public TGetterFromS3<TSchemeGetter> {
+ static TString MetadataKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx) {
+ Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
+ return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << "/metadata.json";
}
- static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, TStringBuf filename) {
- Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
- return TStringBuilder() << settings.items(itemIdx).source_prefix() << '/' << filename;
+ static TString SchemeKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx, TStringBuf filename) {
+ Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
+ return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << '/' << filename;
}
- static TString PermissionsKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
- Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
- return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb";
+ static TString PermissionsKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx) {
+ Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
+ return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << "/permissions.pb";
}
- static TString ChangefeedDescriptionKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, const TString& changefeedName) {
- Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
- return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/changefeed_description.pb";
+ static TString ChangefeedDescriptionKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx, const TString& changefeedName) {
+ Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
+ return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << "/" << changefeedName << "/changefeed_description.pb";
}
- static TString TopicDescriptionKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, const TString& changefeedName) {
- Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
- return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/topic_description.pb";
+ static TString TopicDescriptionKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx, const TString& changefeedName) {
+ Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
+ return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << "/" << changefeedName << "/topic_description.pb";
}
static bool IsView(TStringBuf schemeKey) {
@@ -61,13 +206,6 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY;
}
- void HeadObject(const TString& key) {
- auto request = Model::HeadObjectRequest()
- .WithKey(key);
-
- Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request));
- }
-
void HandleMetadata(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;
@@ -79,8 +217,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
- const auto contentLength = result.GetResult().GetContentLength();
- GetObject(MetadataKey, std::make_pair(0, contentLength - 1));
+ GetObject(MetadataKey, result.GetResult().GetContentLength());
}
void HandleScheme(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
@@ -92,7 +229,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
if (!IsView(SchemeKey) && NoObjectFound(result.GetError().GetErrorType())) {
// try search for a view
- SchemeKey = SchemeKeyFromSettings(ImportInfo->Settings, ItemIdx, NYdb::NDump::NFiles::CreateView().FileName);
+ SchemeKey = SchemeKeyFromSettings(ImportInfo, ItemIdx, NYdb::NDump::NFiles::CreateView().FileName);
HeadObject(SchemeKey);
return;
}
@@ -101,8 +238,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
- const auto contentLength = result.GetResult().GetContentLength();
- GetObject(SchemeKey, std::make_pair(0, contentLength - 1));
+ GetObject(SchemeKey, result.GetResult().GetContentLength());
}
void HandlePermissions(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
@@ -119,8 +255,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
- const auto contentLength = result.GetResult().GetContentLength();
- GetObject(PermissionsKey, std::make_pair(0, contentLength - 1));
+ GetObject(PermissionsKey, result.GetResult().GetContentLength());
}
void HandleChecksum(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
@@ -134,8 +269,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
- const auto contentLength = result.GetResult().GetContentLength();
- GetObject(NBackup::ChecksumKey(CurrentObjectKey), std::make_pair(0, contentLength - 1));
+ GetObject(NBackup::ChecksumKey(CurrentObjectKey), result.GetResult().GetContentLength(), false);
}
void HandleChangefeed(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
@@ -149,9 +283,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
- const auto contentLength = result.GetResult().GetContentLength();
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size());
- GetObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
+ GetObject(ChangefeedDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), result.GetResult().GetContentLength());
}
void HandleTopic(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
@@ -165,17 +298,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
- const auto contentLength = result.GetResult().GetContentLength();
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size());
- GetObject(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
- }
-
- void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
- auto request = Model::GetObjectRequest()
- .WithKey(key)
- .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second);
-
- Send(Client, new TEvExternalStorage::TEvGetObjectRequest(request));
+ GetObject(TopicDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), result.GetResult().GetContentLength());
}
void HandleMetadata(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
@@ -190,14 +314,19 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
+ TString content;
+ if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::Metadata)) {
+ return;
+ }
+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);
LOG_T("Trying to parse metadata"
<< ": self# " << SelfId()
- << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
+ << ", body# " << SubstGlobalCopy(content, "\n", "\\n"));
- item.Metadata = NBackup::TMetadata::Deserialize(msg.Body);
+ item.Metadata = NBackup::TMetadata::Deserialize(content);
if (item.Metadata.HasVersion() && item.Metadata.GetVersion() == 0) {
NeedValidateChecksums = false;
@@ -208,7 +337,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
};
if (NeedValidateChecksums) {
- StartValidatingChecksum(MetadataKey, msg.Body, nextStep);
+ StartValidatingChecksum(MetadataKey, content, nextStep);
} else {
nextStep();
}
@@ -226,6 +355,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
+ TString content;
+ if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::TableSchema)) {
+ return;
+ }
+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);
@@ -233,11 +367,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
<< ": self# " << SelfId()
<< ", itemIdx# " << ItemIdx
<< ", schemeKey# " << SchemeKey
- << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
+ << ", body# " << SubstGlobalCopy(content, "\n", "\\n"));
if (IsView(SchemeKey)) {
- item.CreationQuery = msg.Body;
- } else if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) {
+ item.CreationQuery = content;
+ } else if (!google::protobuf::TextFormat::ParseFromString(content, &item.Scheme)) {
return Reply(false, "Cannot parse scheme");
}
@@ -250,7 +384,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
};
if (NeedValidateChecksums) {
- StartValidatingChecksum(SchemeKey, msg.Body, nextStep);
+ StartValidatingChecksum(SchemeKey, content, nextStep);
} else {
nextStep();
}
@@ -268,15 +402,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
+ TString content;
+ if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::Permissions)) {
+ return;
+ }
+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);
LOG_T("Trying to parse permissions"
<< ": self# " << SelfId()
- << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
+ << ", body# " << SubstGlobalCopy(content, "\n", "\\n"));
Ydb::Scheme::ModifyPermissionsRequest permissions;
- if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &permissions)) {
+ if (!google::protobuf::TextFormat::ParseFromString(content, &permissions)) {
return Reply(false, "Cannot parse permissions");
}
item.Permissions = std::move(permissions);
@@ -286,7 +425,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
};
if (NeedValidateChecksums) {
- StartValidatingChecksum(PermissionsKey, msg.Body, nextStep);
+ StartValidatingChecksum(PermissionsKey, content, nextStep);
} else {
nextStep();
}
@@ -326,15 +465,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
+ TString content;
+ if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::TableChangefeed)) {
+ return;
+ }
+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);
LOG_T("Trying to parse changefeed"
<< ": self# " << SelfId()
- << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
+ << ", body# " << SubstGlobalCopy(content, "\n", "\\n"));
Ydb::Table::ChangefeedDescription changefeed;
- if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &changefeed)) {
+ if (!google::protobuf::TextFormat::ParseFromString(content, &changefeed)) {
return Reply(false, "Cannot parse сhangefeed");
}
@@ -342,14 +486,14 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
auto nextStep = [this]() {
Become(&TThis::StateDownloadTopics);
- HeadObject(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
+ HeadObject(TopicDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
};
if (NeedValidateChecksums) {
- StartValidatingChecksum(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), msg.Body, nextStep);
+ StartValidatingChecksum(ChangefeedDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), content, nextStep);
} else {
nextStep();
- }
+ }
}
void HandleTopic(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
@@ -364,15 +508,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return;
}
+ TString content;
+ if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::TableTopic)) {
+ return;
+ }
+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);
LOG_T("Trying to parse topic"
<< ": self# " << SelfId()
- << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
+ << ", body# " << SubstGlobalCopy(content, "\n", "\\n"));
Ydb::Topic::DescribeTopicResult topic;
- if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &topic)) {
+ if (!google::protobuf::TextFormat::ParseFromString(content, &topic)) {
return Reply(false, "Cannot parse topic");
}
*item.Changefeeds.MutableChangefeeds(IndexDownloadedChangefeed)->MutableTopic() = std::move(topic);
@@ -382,22 +531,15 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Reply();
} else {
Become(&TThis::StateDownloadChangefeeds);
- HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
+ HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
}
};
if (NeedValidateChecksums) {
- StartValidatingChecksum(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), msg.Body, nextStep);
+ StartValidatingChecksum(TopicDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), content, nextStep);
} else {
nextStep();
- }
- }
-
- void ListObjects(const TString& prefix) {
- auto request = Model::ListObjectsRequest()
- .WithPrefix(prefix);
-
- Send(Client, new TEvExternalStorage::TEvListObjectsRequest(request));
+ }
}
template <typename T>
@@ -431,37 +573,14 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Resize(item.Changefeeds.MutableChangefeeds(), ChangefeedsNames.size());
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size());
- HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
+ HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
} else {
Reply();
}
}
- template <typename TResult>
- bool CheckResult(const TResult& result, const TStringBuf marker) {
- if (result.IsSuccess()) {
- return true;
- }
-
- LOG_E("Error at '" << marker << "'"
- << ": self# " << SelfId()
- << ", error# " << result);
- MaybeRetry(result.GetError());
-
- return false;
- }
-
- void MaybeRetry(const Aws::S3::S3Error& error) {
- if (Attempt < Retries && error.ShouldRetry()) {
- Delay = Min(Delay * ++Attempt, MaxDelay);
- Schedule(Delay, new TEvents::TEvWakeup());
- } else {
- Reply(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str());
- }
- }
-
- void Reply(bool success = true, const TString& error = TString()) {
+ void Reply(bool success = true, const TString& error = TString()) override {
LOG_I("Reply"
<< ": self# " << SelfId()
<< ", success# " << success
@@ -471,26 +590,9 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
PassAway();
}
- void PassAway() override {
- Send(Client, new TEvents::TEvPoisonPill());
- TActor::PassAway();
- }
-
- void CreateClient() {
- if (Client) {
- Send(Client, new TEvents::TEvPoisonPill());
- }
- Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
- }
-
void ListChangefeeds() {
CreateClient();
- ListObjects(ImportInfo->Settings.items(ItemIdx).source_prefix());
- }
-
- void Download(const TString& key) {
- CreateClient();
- HeadObject(key);
+ ListObjects(ImportInfo->GetItemSrcPrefix(ItemIdx));
}
void DownloadMetadata() {
@@ -506,7 +608,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
}
void DownloadChecksum() {
- Download(NBackup::ChecksumKey(CurrentObjectKey));
+ Download(NBackup::ChecksumKey(CurrentObjectKey), false);
}
void DownloadChangefeeds() {
@@ -514,10 +616,6 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
ListChangefeeds();
}
- void ResetRetries() {
- Attempt = 0;
- }
-
void StartDownloadingScheme() {
ResetRetries();
DownloadScheme();
@@ -546,17 +644,15 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
}
public:
- explicit TSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx)
- : ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(importInfo->Settings))
+ explicit TSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe<NBackup::TEncryptionIV> iv)
+ : TGetterFromS3<TSchemeGetter>(std::move(importInfo), std::move(iv))
, ReplyTo(replyTo)
- , ImportInfo(importInfo)
, ItemIdx(itemIdx)
- , MetadataKey(MetadataKeyFromSettings(importInfo->Settings, itemIdx))
- , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx, "scheme.pb"))
- , PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx))
- , Retries(importInfo->Settings.number_of_retries())
- , NeedDownloadPermissions(!importInfo->Settings.no_acl())
- , NeedValidateChecksums(!importInfo->Settings.skip_checksum_validation())
+ , MetadataKey(MetadataKeyFromSettings(ImportInfo, itemIdx))
+ , SchemeKey(SchemeKeyFromSettings(ImportInfo, itemIdx, "scheme.pb"))
+ , PermissionsKey(PermissionsKeyFromSettings(ImportInfo, itemIdx))
+ , NeedDownloadPermissions(!ImportInfo->Settings.no_acl())
+ , NeedValidateChecksums(!ImportInfo->Settings.skip_checksum_validation())
{
}
@@ -627,9 +723,7 @@ public:
}
private:
- NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
const TActorId ReplyTo;
- TImportInfo::TPtr ImportInfo;
const ui32 ItemIdx;
const TString MetadataKey;
@@ -638,16 +732,8 @@ private:
TVector<TString> ChangefeedsNames;
ui64 IndexDownloadedChangefeed = 0;
- const ui32 Retries;
- ui32 Attempt = 0;
-
- TDuration Delay = TDuration::Minutes(1);
- static constexpr TDuration MaxDelay = TDuration::Minutes(10);
-
const bool NeedDownloadPermissions = true;
- TActorId Client;
-
bool NeedValidateChecksums = true;
TString CurrentObjectChecksum;
@@ -655,8 +741,192 @@ private:
std::function<void()> ChecksumValidatedCallback;
}; // TSchemeGetter
-IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx) {
- return new TSchemeGetter(replyTo, importInfo, itemIdx);
+class TSchemaMappingGetter : public TGetterFromS3<TSchemaMappingGetter> {
+ static TString SchemaMappingKeyFromSettings(const TImportInfo::TPtr& importInfo) {
+ return TStringBuilder() << importInfo->Settings.source_prefix() << "/SchemaMapping/mapping.json";
+ }
+
+ static TString SchemaMappingMetadataKeyFromSettings(const TImportInfo::TPtr& importInfo) {
+ return TStringBuilder() << importInfo->Settings.source_prefix() << "/SchemaMapping/metadata.json";
+ }
+
+ void HandleMetadata(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ LOG_D("HandleMetadata TEvExternalStorage::TEvHeadObjectResponse"
+ << ": self# " << SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, "HeadObject")) {
+ return;
+ }
+
+ GetObject(MetadataKey, result.GetResult().GetContentLength());
+ }
+
+ void HandleSchemaMapping(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ LOG_D("HandleSchemaMapping TEvExternalStorage::TEvHeadObjectResponse"
+ << ": self# " << SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, "HeadObject")) {
+ return;
+ }
+
+ GetObject(SchemaMappingKey, result.GetResult().GetContentLength());
+ }
+
+ void HandleMetadata(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
+ const auto& msg = *ev->Get();
+ const auto& result = msg.Result;
+
+ LOG_D("HandleMetadata TEvExternalStorage::TEvGetObjectResponse"
+ << ": self# " << SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, "GetObject")) {
+ return;
+ }
+
+ TString content;
+ if (!MaybeDecryptAndSaveIV(msg.Body, content)) {
+ return;
+ }
+ ImportInfo->ExportIV = IV;
+
+ LOG_T("Trying to parse metadata"
+ << ": self# " << SelfId()
+ << ", body# " << SubstGlobalCopy(content, "\n", "\\n"));
+
+ if (!ProcessMetadata(content)) {
+ return;
+ }
+
+ auto nextStep = [this]() {
+ StartDownloadingSchemaMapping();
+ };
+
+ nextStep();
+ }
+
+ void HandleSchemaMapping(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
+ const auto& msg = *ev->Get();
+ const auto& result = msg.Result;
+
+ LOG_D("HandleSchemaMapping TEvExternalStorage::TEvGetObjectResponse"
+ << ": self# " << SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, "GetObject")) {
+ return;
+ }
+
+ TString content;
+ if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::SchemaMapping)) {
+ return;
+ }
+
+ LOG_T("Trying to parse scheme"
+ << ": self# " << SelfId()
+ << ", schemaMappingKey# " << SchemaMappingKey
+ << ", body# " << SubstGlobalCopy(content, "\n", "\\n"));
+
+ ImportInfo->SchemaMapping.ConstructInPlace();
+ TString error;
+ if (!ImportInfo->SchemaMapping->Deserialize(content, error)) {
+ Reply(false, error);
+ return;
+ }
+
+ Reply();
+ }
+
+ void Reply(bool success = true, const TString& error = TString()) override {
+ LOG_I("Reply"
+ << ": self# " << SelfId()
+ << ", success# " << success
+ << ", error# " << error);
+
+ Send(ReplyTo, new TEvPrivate::TEvImportSchemaMappingReady(ImportInfo->Id, success, error));
+ PassAway();
+ }
+
+ void DownloadMetadata() {
+ Download(MetadataKey);
+ }
+
+ void DownloadSchemaMapping() {
+ Download(SchemaMappingKey);
+ }
+
+ void StartDownloadingSchemaMapping() {
+ ResetRetries();
+ DownloadSchemaMapping();
+ Become(&TThis::StateDownloadSchemaMapping);
+ }
+
+ bool ProcessMetadata(const TString& content) {
+ NJson::TJsonValue json;
+ if (!NJson::ReadJsonTree(content, &json)) {
+ Reply(false, "Failed to parse metadata json");
+ return false;
+ }
+ const NJson::TJsonValue& kind = json["kind"];
+ if (kind.GetString() != "SchemaMappingV0") {
+ Reply(false, TStringBuilder() << "Unknown kind of metadata json: " << kind.GetString());
+ return false;
+ }
+ return true;
+ }
+
+public:
+ TSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo)
+ : TGetterFromS3<TSchemaMappingGetter>(std::move(importInfo), Nothing())
+ , ReplyTo(replyTo)
+ , MetadataKey(SchemaMappingMetadataKeyFromSettings(ImportInfo))
+ , SchemaMappingKey(SchemaMappingKeyFromSettings(ImportInfo))
+ {
+ }
+
+ void Bootstrap() {
+ DownloadMetadata();
+ Become(&TThis::StateDownloadMetadata);
+ }
+
+ STATEFN(StateDownloadMetadata) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleMetadata);
+ hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleMetadata);
+
+ sFunc(TEvents::TEvWakeup, DownloadMetadata);
+ sFunc(TEvents::TEvPoisonPill, PassAway);
+ }
+ }
+
+ STATEFN(StateDownloadSchemaMapping) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleSchemaMapping);
+ hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleSchemaMapping);
+
+ sFunc(TEvents::TEvWakeup, DownloadSchemaMapping);
+ sFunc(TEvents::TEvPoisonPill, PassAway);
+ }
+ }
+
+private:
+ const TActorId ReplyTo;
+ const TString MetadataKey;
+ const TString SchemaMappingKey;
+}; // TSchemaMappingGetter
+
+IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe<NBackup::TEncryptionIV> iv) {
+ return new TSchemeGetter(replyTo, std::move(importInfo), itemIdx, std::move(iv));
+}
+
+IActor* CreateSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo) {
+ return new TSchemaMappingGetter(replyTo, std::move(importInfo));
}
} // NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h
index 24b67c7c65..32b108c20e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h
+++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h
@@ -6,7 +6,9 @@
namespace NKikimr {
namespace NSchemeShard {
-IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx);
+IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe<NBackup::TEncryptionIV> iv);
+
+IActor* CreateSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo);
} // NSchemeShard
} // NKikimr
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp
index 9c3c9f91fb..f3b6aee836 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp
@@ -25,11 +25,32 @@ private:
const TActorId ReplyTo;
TImportInfo::TPtr ImportInfo;
const ui32 ItemIdx;
+}; // TSchemeGetterFallback
+
+class TSchemaMappingGetterFallback: public TActorBootstrapped<TSchemaMappingGetterFallback> {
+public:
+ explicit TSchemaMappingGetterFallback(const TActorId& replyTo, TImportInfo::TPtr importInfo)
+ : ReplyTo(replyTo)
+ , ImportInfo(importInfo)
+ {
+ }
+
+ void Bootstrap() {
+ Send(ReplyTo, new TEvPrivate::TEvImportSchemaMappingReady(ImportInfo->Id, false, "Imports from S3 are disabled"));
+ PassAway();
+ }
+private:
+ const TActorId ReplyTo;
+ TImportInfo::TPtr ImportInfo;
}; // TSchemeGetterFallback
-IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx) {
- return new TSchemeGetterFallback(replyTo, importInfo, itemIdx);
+IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe<NBackup::TEncryptionIV>) {
+ return new TSchemeGetterFallback(replyTo, std::move(importInfo), itemIdx);
+}
+
+IActor* CreateSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo) {
+ return new TSchemaMappingGetterFallback(replyTo, std::move(importInfo));
}
} // NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 351a21adf0..7e0879ea53 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -12,6 +12,7 @@
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/control/lib/immediate_control_board_impl.h>
+#include <ydb/core/backup/common/encryption.h>
#include <ydb/core/backup/common/metadata.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/base/table_vector_index.h>
@@ -2846,12 +2847,13 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
enum class EState: ui8 {
Invalid = 0,
- Waiting,
- GetScheme,
- CreateSchemeObject,
- Transferring,
- BuildIndexes,
- CreateChangefeed,
+ Waiting = 1,
+ GetScheme = 2,
+ CreateSchemeObject = 3,
+ Transferring = 4,
+ BuildIndexes = 5,
+ CreateChangefeed = 6,
+ DownloadExportMetadata = 7,
Done = 240,
Cancellation = 250,
Cancelled = 251,
@@ -2875,6 +2877,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
TString DstPathName;
TPathId DstPathId;
+ TString SrcPrefix;
Ydb::Table::CreateTableRequest Scheme;
TString CreationQuery;
TMaybe<NKikimrSchemeOp::TModifyScheme> PreparedCreationQuery;
@@ -2892,6 +2895,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
int NextChangefeedIdx = 0;
TString Issue;
TPathId StreamImplPathId;
+ TMaybe<NBackup::TEncryptionIV> ExportItemIV;
TItem() = default;
@@ -2918,6 +2922,9 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
TPathId DomainPathId;
TMaybe<TString> UserSID;
TString PeerName; // required for making audit log records
+ TMaybe<NBackup::TEncryptionIV> ExportIV;
+ TMaybe<NBackup::TSchemaMapping> SchemaMapping;
+ TActorId SchemaMappingGetter;
EState State = EState::Invalid;
TString Issue;
@@ -2929,6 +2936,20 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
TInstant StartTime = TInstant::Zero();
TInstant EndTime = TInstant::Zero();
+ TString GetItemSrcPrefix(size_t i) const {
+ if (i < Items.size() && Items[i].SrcPrefix) {
+ return Items[i].SrcPrefix;
+ }
+
+ // Backward compatibility.
+ // But there can be no paths in settings at all.
+ if (i < ui32(Settings.items_size())) {
+ return Settings.items(i).source_prefix();
+ }
+
+ return {};
+ }
+
explicit TImportInfo(
const ui64 id,
const TString& uid,
@@ -3195,8 +3216,8 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
Parent = ParentEnd();
}
- void Set(ui32 level,
- NTableIndex::TClusterId parentBegin, NTableIndex::TClusterId parent,
+ void Set(ui32 level,
+ NTableIndex::TClusterId parentBegin, NTableIndex::TClusterId parent,
NTableIndex::TClusterId childBegin, NTableIndex::TClusterId child,
ui32 state, ui64 tableSize) {
Level = level;
@@ -3665,7 +3686,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
TSerializedTableRange bound{range};
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::BUILD_INDEX,
- "AddShardStatus id# " << Id << " shard " << shardIdx <<
+ "AddShardStatus id# " << Id << " shard " << shardIdx <<
" range " << KMeans.RangeToDebugStr(bound, IsBuildPrefixedVectorIndex() ? 2 : 1));
AddParent(bound, shardIdx);
Shards.emplace(
diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h
index 1e653640ce..d6bcba2d81 100644
--- a/ydb/core/tx/schemeshard/schemeshard_private.h
+++ b/ydb/core/tx/schemeshard/schemeshard_private.h
@@ -20,6 +20,7 @@ namespace TEvPrivate {
EvRunConditionalErase,
EvIndexBuildBilling,
EvImportSchemeReady,
+ EvImportSchemaMappingReady,
EvImportSchemeQueryResult,
EvExportSchemeUploadResult,
EvExportUploadMetadataResult,
@@ -105,6 +106,18 @@ namespace TEvPrivate {
{}
};
+ struct TEvImportSchemaMappingReady: public TEventLocal<TEvImportSchemaMappingReady, EvImportSchemaMappingReady> {
+ const ui64 ImportId;
+ const bool Success;
+ const TString Error;
+
+ TEvImportSchemaMappingReady(ui64 id, bool success, const TString& error)
+ : ImportId(id)
+ , Success(success)
+ , Error(error)
+ {}
+ };
+
struct TEvImportSchemeQueryResult: public TEventLocal<TEvImportSchemeQueryResult, EvImportSchemeQueryResult> {
const ui64 ImportId;
const ui32 ItemIdx;
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index baa342f150..4c1c693933 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1596,6 +1596,8 @@ struct Schema : NIceDb::Schema {
struct NextIndexIdx : Column<9, NScheme::NTypeIds::Uint32> {};
struct NextChangefeedIdx : Column<16, NScheme::NTypeIds::Uint32> {};
struct Issue : Column<10, NScheme::NTypeIds::Utf8> {};
+ struct SrcPrefix : Column<17, NScheme::NTypeIds::Utf8> {};
+ struct EncryptionIV : Column<18, NScheme::NTypeIds::String> {};
using TKey = TableKey<ImportId, Index>;
using TColumns = TableColumns<
@@ -1614,7 +1616,9 @@ struct Schema : NIceDb::Schema {
WaitTxId,
NextIndexIdx,
NextChangefeedIdx,
- Issue
+ Issue,
+ SrcPrefix,
+ EncryptionIV
>;
};
diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
index 676d59c9d7..0693679da5 100644
--- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
@@ -1564,9 +1564,10 @@ value {
}
}
- Y_UNIT_TEST(ExportImportOnSupportedDatatypes) {
+ void ExportImportOnSupportedDatatypesImpl(bool encrypted, bool commonPrefix) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableParameterizedDecimal(true));
+ runtime.GetAppData().FeatureFlags.SetEnableEncryptedExport(true);
ui64 txId = 100;
TestCreateTable(runtime, ++txId, "/MyRoot", R"_(
@@ -1656,16 +1657,51 @@ value {
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());
+ TString encryptionSettings;
+ if (encrypted) {
+ encryptionSettings = R"(encryption_settings {
+ encryption_algorithm: "ChaCha20-Poly1305"
+ symmetric_key {
+ key: "Very very secret export key!!!!!"
+ }
+ })";
+ }
+ TString exportItems, importItems;
+ if (commonPrefix) {
+ exportItems = R"(
+ source_path: "/MyRoot"
+ destination_prefix: "BackupPrefix"
+ items {
+ source_path: "Table"
+ }
+ )";
+ importItems = R"(
+ source_prefix: "BackupPrefix"
+ destination_path: "/MyRoot/Restored"
+ )";
+ } else {
+ exportItems = R"(
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: "Backup1"
+ }
+ )";
+ importItems = R"(
+ items {
+ source_prefix: "Backup1"
+ destination_path: "/MyRoot/Restored"
+ }
+ )";
+ }
+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
- items {
- source_path: "/MyRoot/Table"
- destination_prefix: "Backup1"
- }
+ %s
+ %s
}
- )", port));
+ )", port, exportItems.c_str(), encryptionSettings.c_str()));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");
@@ -1673,12 +1709,10 @@ value {
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
- items {
- source_prefix: "Backup1"
- destination_path: "/MyRoot/Restored"
- }
+ %s
+ %s
}
- )", port));
+ )", port, importItems.c_str(), encryptionSettings.c_str()));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");
@@ -1744,10 +1778,22 @@ value {
auto contentOriginalTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", readKeyDesc, readColumns);
NKqp::CompareYson(expectedJson, contentOriginalTable);
- auto contentRestoredTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets + 2, "Restored", readKeyDesc, readColumns);
+ auto contentRestoredTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets + 2, commonPrefix ? "Table" : "Restored", readKeyDesc, readColumns);
NKqp::CompareYson(expectedJson, contentRestoredTable);
}
+ Y_UNIT_TEST(ExportImportOnSupportedDatatypes) {
+ ExportImportOnSupportedDatatypesImpl(false, false);
+ }
+
+ Y_UNIT_TEST(ExportImportOnSupportedDatatypesWithCommonDestPrefix) {
+ ExportImportOnSupportedDatatypesImpl(false, true);
+ }
+
+ Y_UNIT_TEST(ExportImportOnSupportedDatatypesEncrypted) {
+ ExportImportOnSupportedDatatypesImpl(true, true);
+ }
+
Y_UNIT_TEST(ExportImportPg) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index d9dea01aab..f76ea2e948 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -6259,6 +6259,16 @@
"ColumnId": 16,
"ColumnName": "NextChangefeedIdx",
"ColumnType": "Uint32"
+ },
+ {
+ "ColumnId": 17,
+ "ColumnName": "SrcPrefix",
+ "ColumnType": "Utf8"
+ },
+ {
+ "ColumnId": 18,
+ "ColumnName": "EncryptionIV",
+ "ColumnType": "String"
}
],
"ColumnsDropped": [],
@@ -6280,7 +6290,9 @@
13,
14,
15,
- 16
+ 16,
+ 17,
+ 18
],
"RoomID": 0,
"Codec": 0,