diff options
author | Vasily Gerasimov <UgnineSirdis@ydb.tech> | 2025-03-31 21:26:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-31 21:26:02 +0300 |
commit | 4ae39ac0db24e7564f13a3d0545a540bd647283c (patch) | |
tree | 7eb2cb7c2312f2910191011e0c0baa0a1c38e95b | |
parent | ba56e6037c80d57513c9acc90f517e7c0e44ce4c (diff) | |
download | ydb-4ae39ac0db24e7564f13a3d0545a540bd647283c.tar.gz |
Implement encrypted import (#16434)
22 files changed, 931 insertions, 242 deletions
diff --git a/ydb/core/backup/common/metadata.cpp b/ydb/core/backup/common/metadata.cpp index fd70983aca..d54b879861 100644 --- a/ydb/core/backup/common/metadata.cpp +++ b/ydb/core/backup/common/metadata.cpp @@ -53,4 +53,48 @@ TMetadata TMetadata::Deserialize(const TString& metadata) { return result; } +bool TSchemaMapping::Deserialize(const TString& jsonContent, TString& error) { + NJson::TJsonValue json; + if (!NJson::ReadJsonTree(jsonContent, &json)) { + error = "Failed to parse schema mapping json"; + return false; + } + const NJson::TJsonValue& mapping = json["exportedObjects"]; + if (!mapping.IsMap()) { + error = "No mapping in mapping json"; + return false; + } + + bool hasIV = false; + bool first = true; + for (const auto& [exportObject, info] : mapping.GetMap()) { + const NJson::TJsonValue& exportPrefix = info["exportPrefix"]; + const NJson::TJsonValue& iv = info["iv"]; + if (!exportPrefix.IsString()) { + error = "Incorrect exportPrefix"; + return false; + } + if (first) { + hasIV = info.Has("iv"); + } else { + first = false; + if (hasIV != info.Has("iv")) { + error = "Incorrect iv in schema mapping json"; + return false; + } + } + if (hasIV && !iv.IsString()) { + error = "IV in schema mapping json is not a string"; + return false; + } + TItem& item = Items.emplace_back(); + item.ExportPrefix = exportPrefix.GetString(); + item.ObjectPath = exportObject; + if (hasIV) { + item.IV = TEncryptionIV::FromHexString(iv.GetString()); + } + } + return true; +} + } diff --git a/ydb/core/backup/common/metadata.h b/ydb/core/backup/common/metadata.h index f92e5e4fd6..7479f7ee41 100644 --- a/ydb/core/backup/common/metadata.h +++ b/ydb/core/backup/common/metadata.h @@ -1,4 +1,5 @@ #pragma once +#include "encryption.h" #include <ydb/core/base/row_version.h> @@ -56,4 +57,20 @@ private: TMaybeFail<ui64> Version; }; +class TSchemaMapping { +public: + struct TItem { + TString ExportPrefix; + TString ObjectPath; + TMaybe<NBackup::TEncryptionIV> IV; + }; + + TSchemaMapping() = default; + + bool Deserialize(const TString& jsonContent, TString& error); + +public: + std::vector<TItem> Items; +}; + } // namespace NKikimr::NBackup diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 9d0f3fdfbe..e048e9051a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1268,6 +1268,15 @@ message TRestoreTask { } optional bool ValidateChecksums = 7; // currently available for s3 + + message TEncryptionSettings { + optional bytes IV = 1; + oneof Key { + Ydb.Export.EncryptionSettings.SymmetricKey SymmetricKey = 2; + } + } + + optional TEncryptionSettings EncryptionSettings = 8; } message TPersQueueGroupAllocate { diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h index 1dc2c6afed..bd84fd91ef 100644 --- a/ydb/core/tx/datashard/extstorage_usage_config.h +++ b/ydb/core/tx/datashard/extstorage_usage_config.h @@ -25,21 +25,11 @@ public: const TMaybe<NBackup::TEncryptionIV> IV; static TEncryptionSettings FromBackupTask(const NKikimrSchemeOp::TBackupTask& task) { - if (task.HasEncryptionSettings()) { - return TEncryptionSettings{ - .EncryptedBackup = true, - .EncryptionAlgorithm = task.GetEncryptionSettings().GetEncryptionAlgorithm(), - .Key = NBackup::TEncryptionKey(task.GetEncryptionSettings().GetSymmetricKey().key()), - .IV = NBackup::TEncryptionIV::FromBinaryString(task.GetEncryptionSettings().GetIV()), - }; - } else { - return TEncryptionSettings{ - .EncryptedBackup = false, - .EncryptionAlgorithm = {}, - .Key = Nothing(), - .IV = Nothing(), - }; - } + return FromProto(task); + } + + static TEncryptionSettings FromRestoreTask(const NKikimrSchemeOp::TRestoreTask& task) { + return FromProto(task); } TMaybe<NBackup::TEncryptionIV> GetMetadataIV() const { @@ -70,6 +60,29 @@ public: } return iv; } + + template <class TProto> + static TEncryptionSettings FromProto(const TProto& task) { + if (task.HasEncryptionSettings()) { + TString algorithm; + if constexpr (std::is_same_v<TProto, NKikimrSchemeOp::TBackupTask>) { + algorithm = task.GetEncryptionSettings().GetEncryptionAlgorithm(); + } + return TEncryptionSettings{ + .EncryptedBackup = true, + .EncryptionAlgorithm = algorithm, + .Key = NBackup::TEncryptionKey(task.GetEncryptionSettings().GetSymmetricKey().key()), + .IV = NBackup::TEncryptionIV::FromBinaryString(task.GetEncryptionSettings().GetIV()), + }; + } else { + return TEncryptionSettings{ + .EncryptedBackup = false, + .EncryptionAlgorithm = {}, + .Key = Nothing(), + .IV = Nothing(), + }; + } + } }; public: const TString Bucket; @@ -93,7 +106,7 @@ public: } static TS3Settings FromRestoreTask(const NKikimrSchemeOp::TRestoreTask& task) { - return TS3Settings(task.GetS3Settings(), task.GetShardNum(), TEncryptionSettings{}); + return TS3Settings(task.GetS3Settings(), task.GetShardNum(), TEncryptionSettings::FromRestoreTask(task)); } inline const TString& GetBucket() const { return Bucket; } diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index aa90e9da6f..035e5230ba 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -488,17 +488,34 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { return HeadObject(Settings.GetDataKey(DataFormat, CompressionCodec)); } + THolder<IReadController> reader; switch (CompressionCodec) { case NBackupRestoreTraits::ECompressionCodec::None: - Reader.Reset(new TReadControllerRaw(ReadBatchSize, ReadBufferSizeLimit)); + reader.Reset(new TReadControllerRaw(ReadBatchSize, ReadBufferSizeLimit)); break; case NBackupRestoreTraits::ECompressionCodec::Zstd: - Reader.Reset(new TReadControllerZstd(ReadBatchSize, ReadBufferSizeLimit)); + reader.Reset(new TReadControllerZstd(ReadBatchSize, ReadBufferSizeLimit)); break; case NBackupRestoreTraits::ECompressionCodec::Invalid: Y_ENSURE(false, "unreachable"); } + if (Settings.EncryptionSettings.EncryptedBackup) { + NBackup::TEncryptionIV expectedIV = NBackup::TEncryptionIV::Combine( + *Settings.EncryptionSettings.IV, + NBackup::EBackupFileType::TableData, + 0 /* already combined */, + Settings.Shard + ); + Reader = MakeHolder<TEncryptionDeserializerController>( + *Settings.EncryptionSettings.Key, + expectedIV, + std::move(reader) + ); + } else { + Reader = std::move(reader); + } + ETag = result.GetResult().GetETag(); ContentLength = result.GetResult().GetContentLength(); diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index c97a392a51..1b346d40ee 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4431,6 +4431,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } switch (importInfo->State) { + case TImportInfo::EState::DownloadExportMetadata: case TImportInfo::EState::Waiting: case TImportInfo::EState::Cancellation: ImportsToResume.push_back(importInfo->Id); @@ -4507,6 +4508,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0); item.NextChangefeedIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextChangefeedIdx>(0); item.Issue = rowset.GetValueOrDefault<Schema::ImportItems::Issue>(TString()); + item.SrcPrefix = rowset.GetValueOrDefault<Schema::ImportItems::SrcPrefix>(TString()); + if (rowset.HaveValue<Schema::ImportItems::EncryptionIV>()) { + item.ExportItemIV = NBackup::TEncryptionIV::FromBinaryString(rowset.GetValue<Schema::ImportItems::EncryptionIV>()); + } if (item.WaitTxId != InvalidTxId) { Self->TxIdToImport[item.WaitTxId] = {importId, itemIdx}; diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 099c564a3e..ea97b3310e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -209,12 +209,25 @@ private: return true; } + TString GetCommonSourcePath(const Ydb::Export::ExportToS3Settings& settings) { + return settings.source_path(); + } + + TString GetCommonSourcePath(const Ydb::Export::ExportToYtSettings&) { + return {}; + } + template <typename TSettings> bool FillItems(TExportInfo::TPtr exportInfo, const TSettings& settings, TString& explain) { + TString commonSourcePath = GetCommonSourcePath(settings); + if (commonSourcePath && commonSourcePath.back() != '/') { + commonSourcePath.push_back('/'); + } exportInfo->Items.reserve(settings.items().size()); for (ui32 itemIdx : xrange(settings.items().size())) { const auto& item = settings.items(itemIdx); - const TPath path = TPath::Resolve(item.source_path(), Self); + const TString srcPath = commonSourcePath + item.source_path(); + const TPath path = TPath::Resolve(srcPath, Self); { TPath::TChecker checks = path.Check(); checks @@ -230,7 +243,7 @@ private: } } - exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId, path->PathType); + exportInfo->Items.emplace_back(srcPath, path.Base()->PathId, path->PathType); exportInfo->PendingItems.push_back(itemIdx); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp b/ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp index e8f81a6946..2851f91ffb 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_uploaders.cpp @@ -477,7 +477,6 @@ private: } writer.CloseMap(); } - writer.Write("exportedObjects", "SchemaMappingV0"); writer.CloseMap(); writer.CloseMap(); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index f0b26158dd..518bcbbf79 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4949,6 +4949,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvImport::TEvForgetImportRequest, Handle); HFuncTraced(TEvImport::TEvListImportsRequest, Handle); HFuncTraced(TEvPrivate::TEvImportSchemeReady, Handle); + HFuncTraced(TEvPrivate::TEvImportSchemaMappingReady, Handle); HFuncTraced(TEvPrivate::TEvImportSchemeQueryResult, Handle); // } // NImport diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 6f3ea3f723..9b4a405f96 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1305,6 +1305,7 @@ public: void FromXxportInfo(NKikimrImport::TImport& exprt, const TImportInfo::TPtr importInfo); static void PersistCreateImport(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo); + static void PersistSchemaMappingImportFields(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo); static void PersistRemoveImport(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo); static void PersistImportState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo); static void PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); @@ -1333,6 +1334,7 @@ public: NTabletFlatExecutor::ITransaction* CreateTxProgressImport(ui64 id, const TMaybe<ui32>& itemIdx = Nothing()); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeReady::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); @@ -1345,6 +1347,7 @@ public: void Handle(TEvImport::TEvForgetImportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvImport::TEvListImportsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx); void ResumeImports(const TVector<ui64>& ids, const TActorContext& ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_import.cpp b/ydb/core/tx/schemeshard/schemeshard_import.cpp index 05a002eee5..2ccd2d7572 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import.cpp @@ -60,6 +60,9 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI } switch (importInfo->State) { + case TImportInfo::EState::DownloadExportMetadata: + import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_PREPARING); + break; case TImportInfo::EState::Waiting: switch (GetMinState(importInfo)) { case TImportInfo::EState::GetScheme: @@ -136,8 +139,27 @@ void TSchemeShard::PersistCreateImport(NIceDb::TNiceDb& db, const TImportInfo::T db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update( NIceDb::TUpdate<Schema::ImportItems::DstPathName>(item.DstPathName), - NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State)) + NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State)), + NIceDb::TUpdate<Schema::ImportItems::SrcPrefix>(item.SrcPrefix) + ); + } +} + +void TSchemeShard::PersistSchemaMappingImportFields(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo) { + // There can be new items, so do at least the same as for creation + for (ui32 itemIdx : xrange(importInfo->Items.size())) { + const auto& item = importInfo->Items.at(itemIdx); + + db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update( + NIceDb::TUpdate<Schema::ImportItems::DstPathName>(item.DstPathName), + NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State)), + NIceDb::TUpdate<Schema::ImportItems::SrcPrefix>(item.SrcPrefix) ); + if (item.ExportItemIV) { + db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update( + NIceDb::TUpdate<Schema::ImportItems::EncryptionIV>(item.ExportItemIV->GetBinaryString()) + ); + } } } @@ -244,6 +266,10 @@ void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TAct Execute(CreateTxProgressImport(ev), ctx); } +void TSchemeShard::Handle(TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressImport(ev), ctx); +} + void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx) { Execute(CreateTxProgressImport(ev), ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp index d494c514c3..efed43d23d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp @@ -54,6 +54,7 @@ struct TSchemeShard::TImport::TTxCancel: public TSchemeShard::TXxport::TTxBase { case TImportInfo::EState::Cancelled: return respond(Ydb::StatusIds::SUCCESS); + case TImportInfo::EState::DownloadExportMetadata: case TImportInfo::EState::Waiting: case TImportInfo::EState::Cancellation: importInfo->Issue = "Cancelled manually"; diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index 98764f8f9b..5a0d4b9ddd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -67,6 +67,42 @@ TString GetDatabase(TSchemeShard& ss) { return CanonizePath(ss.RootPathElements); } +bool ValidateDstPath(const TString& dstPath, TSchemeShard* ss, TString& explain) { + const TPath path = TPath::Resolve(dstPath, ss); + TPath::TChecker checks = path.Check(); + checks + .IsAtLocalSchemeShard() + .HasResolvedPrefix() + .FailOnRestrictedCreateInTempZone(); + + if (path.IsResolved()) { + checks + .IsResolved() + .IsDeleted(); + } else { + checks + .NotEmpty() + .NotResolved(); + } + + if (checks) { + checks + .IsValidLeafName() + .DepthLimit() + .PathsLimit(); + + if (path.Parent().IsResolved()) { + checks.DirChildrenLimit(); + } + } + + if (!checks) { + explain = checks.GetError(); + return false; + } + return true; +} + } struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { @@ -141,6 +177,7 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } TImportInfo::TPtr importInfo = nullptr; + TImportInfo::EState initialState = TImportInfo::EState::Waiting; switch (request.GetRequest().GetSettingsCase()) { case NKikimrImport::TCreateImportRequest::kImportFromS3Settings: @@ -150,6 +187,10 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { settings.set_scheme(Ydb::Import::ImportFromS3Settings::HTTPS); } + if (!settings.source_prefix().empty() && AppData()->FeatureFlags.GetEnableEncryptedExport()) { + initialState = TImportInfo::EState::DownloadExportMetadata; + } + importInfo = new TImportInfo(id, uid, TImportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName()); if (request.HasUserSID()) { @@ -172,7 +213,7 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { NIceDb::TNiceDb db(txc.DB); Self->PersistCreateImport(db, importInfo); - importInfo->State = TImportInfo::EState::Waiting; + importInfo->State = initialState; importInfo->StartTime = TAppData::TimeProvider->Now(); Self->PersistImportState(db, importInfo); @@ -232,42 +273,12 @@ private: return false; } - const TPath path = TPath::Resolve(dstPath, Self); - { - TPath::TChecker checks = path.Check(); - checks - .IsAtLocalSchemeShard() - .HasResolvedPrefix() - .FailOnRestrictedCreateInTempZone(); - - if (path.IsResolved()) { - checks - .IsResolved() - .IsDeleted(); - } else { - checks - .NotEmpty() - .NotResolved(); - } - - if (checks) { - checks - .IsValidLeafName() - .DepthLimit() - .PathsLimit(); - - if (path.Parent().IsResolved()) { - checks.DirChildrenLimit(); - } - } - - if (!checks) { - explain = checks.GetError(); - return false; - } + if (!ValidateDstPath(dstPath, Self, explain)) { + return false; } - importInfo->Items.emplace_back(dstPath); + auto& item = importInfo->Items.emplace_back(dstPath); + item.SrcPrefix = settings.items(itemIdx).source_prefix(); } return true; @@ -284,6 +295,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui64 Id; TMaybe<ui32> ItemIdx; TEvPrivate::TEvImportSchemeReady::TPtr SchemeResult = nullptr; + TEvPrivate::TEvImportSchemaMappingReady::TPtr SchemaMappingResult = nullptr; TEvPrivate::TEvImportSchemeQueryResult::TPtr SchemeQueryResult = nullptr; TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; @@ -303,6 +315,13 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } + explicit TTxProgress(TSelf* self, TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev) + : TXxport::TTxBase(self) + , Id(ev->Get()->ImportId) + , SchemaMappingResult(ev) + { + } + explicit TTxProgress(TSelf* self, TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) : TXxport::TTxBase(self) , SchemeQueryResult(ev) @@ -342,6 +361,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (SchemeResult) { OnSchemeResult(txc, ctx); + } else if (SchemaMappingResult) { + OnSchemaMappingResult(txc, ctx); } else if (SchemeQueryResult) { OnSchemeQueryPreparation(txc, ctx); } else if (AllocateResult) { @@ -372,10 +393,18 @@ private: << ": info# " << importInfo->ToString() << ", item# " << item.ToString(itemIdx)); - item.SchemeGetter = ctx.RegisterWithSameMailbox(CreateSchemeGetter(Self->SelfId(), importInfo, itemIdx)); + item.SchemeGetter = ctx.RegisterWithSameMailbox(CreateSchemeGetter(Self->SelfId(), importInfo, itemIdx, item.ExportItemIV)); Self->RunningImportSchemeGetters.emplace(item.SchemeGetter); } + void GetSchemaMapping(TImportInfo::TPtr importInfo, const TActorContext& ctx) { + LOG_I("TImport::TTxProgress: Download schema mapping" + << ": info# " << importInfo->ToString()); + + importInfo->SchemaMappingGetter = ctx.RegisterWithSameMailbox(CreateSchemaMappingGetter(Self->SelfId(), importInfo)); + Self->RunningImportSchemeGetters.emplace(importInfo->SchemaMappingGetter); + } + void CreateTable(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); auto& item = importInfo->Items.at(itemIdx); @@ -685,15 +714,27 @@ private: } void Cancel(TImportInfo::TPtr importInfo, ui32 itemIdx, TStringBuf marker) { - Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); - const auto& item = importInfo->Items.at(itemIdx); + const TItem* item = nullptr; + if (itemIdx != ui32(-1)) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + item = &importInfo->Items.at(itemIdx); + } + TStringBuilder itemLogStr; + if (item) { + itemLogStr << ", item# " << item->ToString(itemIdx); + } LOG_N("TImport::TTxProgress: " << marker << ", cancelling" << ", info# " << importInfo->ToString() - << ", item# " << item.ToString(itemIdx)); + << itemLogStr); importInfo->State = EState::Cancelled; + if (auto schemaMappingGetter = std::exchange(importInfo->SchemaMappingGetter, {})) { + Send(schemaMappingGetter, new TEvents::TEvPoisonPill()); + Self->RunningImportSchemeGetters.erase(schemaMappingGetter); + } + for (ui32 i : xrange(importInfo->Items.size())) { KillChildActors(importInfo->Items[i]); if (i == itemIdx) { @@ -720,14 +761,16 @@ private: } void CancelAndPersist(NIceDb::TNiceDb& db, TImportInfo::TPtr importInfo, ui32 itemIdx, TStringBuf itemIssue, TStringBuf marker) { - Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); - auto& item = importInfo->Items[itemIdx]; + if (itemIdx != ui32(-1)) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + auto& item = importInfo->Items[itemIdx]; - item.Issue = itemIssue; - PersistImportItemState(db, importInfo, itemIdx); + item.Issue = itemIssue; + PersistImportItemState(db, importInfo, itemIdx); - if (importInfo->State != EState::Waiting) { - return; + if (importInfo->State != EState::Waiting) { + return; + } } Cancel(importInfo, itemIdx, marker); @@ -801,11 +844,20 @@ private: << ": id# " << Id << ", itemIdx# " << ItemIdx); - if (ItemIdx) { - Resume(importInfo, *ItemIdx, txc, ctx); - } else { - for (ui32 itemIdx : xrange(importInfo->Items.size())) { - Resume(importInfo, itemIdx, txc, ctx); + switch (importInfo->State) { + case EState::DownloadExportMetadata: { + GetSchemaMapping(importInfo, ctx); + break; + } + default: { + if (ItemIdx) { + Resume(importInfo, *ItemIdx, txc, ctx); + } else { + for (ui32 itemIdx : xrange(importInfo->Items.size())) { + Resume(importInfo, itemIdx, txc, ctx); + } + } + break; } } } @@ -943,7 +995,7 @@ private: // Send the creation query to KQP to prepare. const auto database = GetDatabase(*Self); const TString source = TStringBuilder() - << importInfo->Settings.items(msg.ItemIdx).source_prefix() << NYdb::NDump::NFiles::CreateView().FileName; + << importInfo->GetItemSrcPrefix(msg.ItemIdx) << NYdb::NDump::NFiles::CreateView().FileName; NYql::TIssues issues; if (!NYdb::NDump::RewriteCreateViewQuery(item.CreationQuery, database, true, item.DstPathName, issues)) { @@ -965,6 +1017,94 @@ private: } } + void OnSchemaMappingResult(TTransactionContext& txc, const TActorContext& ctx) { + Y_ABORT_UNLESS(SchemaMappingResult); + + const auto& msg = *SchemaMappingResult->Get(); + + LOG_D("TImport::TTxProgress: OnSchemaMappingResult" + << ": id# " << msg.ImportId + << ", success# " << msg.Success + ); + + if (!Self->Imports.contains(msg.ImportId)) { + LOG_E("TImport::TTxProgress: OnSchemaMappingResult received unknown id" + << ": id# " << msg.ImportId); + return; + } + + TImportInfo::TPtr importInfo = Self->Imports.at(msg.ImportId); + + NIceDb::TNiceDb db(txc.DB); + + Self->RunningImportSchemeGetters.erase(std::exchange(importInfo->SchemaMappingGetter, {})); + + if (!msg.Success) { + return CancelAndPersist(db, importInfo, -1, {}, TStringBuilder() << "cannot get schema mapping: " << msg.Error); + } + + if (!importInfo->SchemaMapping->Items.empty()) { + if (importInfo->Settings.has_encryption_settings() != importInfo->SchemaMapping->Items[0].IV.Defined()) { + return CancelAndPersist(db, importInfo, -1, {}, "incorrect schema mapping"); + } + } + + // Path in database for import + TVector<TString> dstPath; + if (importInfo->Settings.destination_path().empty()) { + dstPath = Self->RootPathElements; + } else { + dstPath = SplitPath(importInfo->Settings.destination_path()); + } + TString sourcePrefix = importInfo->Settings.source_prefix(); + if (sourcePrefix && sourcePrefix.back() != '/') { + sourcePrefix.push_back('/'); + } + auto combineDstPath = [&](const TString& mappingObjectPath) { + TVector<TString> objectPath = SplitPath(mappingObjectPath); + TVector<TString> dstObjectPath = dstPath; + dstObjectPath.insert(dstObjectPath.end(), objectPath.begin(), objectPath.end()); + return CombinePath(dstObjectPath.begin(), dstObjectPath.end()); + }; + auto init = [&](const NBackup::TSchemaMapping::TItem& schemaMappingItem, NSchemeShard::TImportInfo::TItem& item) { + TStringBuf exportPrefix(schemaMappingItem.ExportPrefix); + exportPrefix.SkipPrefix("/"); + item.SrcPrefix = TStringBuilder() << sourcePrefix << exportPrefix; + item.ExportItemIV = schemaMappingItem.IV; + }; + if (importInfo->Items.empty()) { // Fill the whole list from schema mapping + for (const auto& schemaMappingItem : importInfo->SchemaMapping->Items) { + TString dstPath = combineDstPath(schemaMappingItem.ObjectPath); + TString explain; + if (!ValidateDstPath(dstPath, Self, explain)) { + return CancelAndPersist(db, importInfo, -1, {}, TStringBuilder() << "cannot validate mapping: " << explain); + } + + auto& item = importInfo->Items.emplace_back(dstPath); + init(schemaMappingItem, item); + } + } else { // Take existing items from items list + THashMap<TString, size_t> schemaMappingIndex; + for (size_t i = 0; i < importInfo->SchemaMapping->Items.size(); ++i) { + schemaMappingIndex[combineDstPath(importInfo->SchemaMapping->Items[i].ObjectPath)] = i; + } + for (auto& item : importInfo->Items) { + TString dstPath = CanonizePath(item.DstPathName); + auto mappingIt = schemaMappingIndex.find(dstPath); + if (mappingIt == schemaMappingIndex.end()) { + return CancelAndPersist(db, importInfo, -1, {}, TStringBuilder() << "cannot find path " << dstPath << " in schema mapping"); + } + const auto& schemaMappingItem = importInfo->SchemaMapping->Items[mappingIt->second]; + init(schemaMappingItem, item); + } + } + + importInfo->State = EState::Waiting; + PersistImportState(db, importInfo); + PersistSchemaMappingImportFields(db, importInfo); + Resume(txc, ctx); + } + void OnSchemeQueryPreparation(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(SchemeQueryResult); const auto& message = *SchemeQueryResult.Get()->Get(); @@ -1408,6 +1548,10 @@ ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeRe return new TImport::TTxProgress(this, ev); } +ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemaMappingReady::TPtr& ev) { + return new TImport::TTxProgress(this, ev); +} + ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) { return new TImport::TTxProgress(this, ev); } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index 01984722d3..745df56387 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -144,6 +144,14 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> RestorePropose( task.SetTableName(dstPath.LeafName()); *task.MutableTableDescription() = RebuildTableDescription(GetTableDescription(ss, item.DstPathId), item.Scheme); + if (importInfo->Settings.has_encryption_settings()) { + auto& taskEncryptionSettings = *task.MutableEncryptionSettings(); + *taskEncryptionSettings.MutableSymmetricKey() = importInfo->Settings.encryption_settings().symmetric_key(); + if (item.ExportItemIV) { + taskEncryptionSettings.SetIV(item.ExportItemIV->GetBinaryString()); + } + } + switch (importInfo->Kind) { case TImportInfo::EKind::S3: { @@ -153,7 +161,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> RestorePropose( restoreSettings.SetBucket(importInfo->Settings.bucket()); restoreSettings.SetAccessKey(importInfo->Settings.access_key()); restoreSettings.SetSecretKey(importInfo->Settings.secret_key()); - restoreSettings.SetObjectKeyPattern(importInfo->Settings.items(itemIdx).source_prefix()); + restoreSettings.SetObjectKeyPattern(importInfo->GetItemSrcPrefix(itemIdx)); restoreSettings.SetUseVirtualAddressing(!importInfo->Settings.disable_virtual_addressing()); switch (importInfo->Settings.scheme()) { @@ -338,7 +346,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose( auto* tabletConfig = pqGroup.MutablePQTabletConfig(); const auto& pqConfig = AppData()->PQConfig; - + for (const auto& consumer : topic.consumers()) { auto& addedConsumer = *tabletConfig->AddConsumers(); auto consumerName = NPersQueue::ConvertNewConsumerName(consumer.name(), pqConfig); @@ -347,7 +355,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose( addedConsumer.SetImportant(true); } } - + return propose; } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp index e09e9efadc..139e3c8dfc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp @@ -3,6 +3,7 @@ #include "schemeshard_private.h" #include <ydb/core/backup/common/checksum.h> +#include <ydb/core/backup/common/encryption.h> #include <ydb/core/backup/common/metadata.h> #include <ydb/core/wrappers/s3_storage_config.h> #include <ydb/core/wrappers/s3_wrapper.h> @@ -12,6 +13,8 @@ #include <ydb/library/actors/core/hfunc.h> #include <ydb/public/lib/ydb_cli/dump/files/files.h> +#include <library/cpp/json/json_reader.h> + #include <google/protobuf/text_format.h> #include <util/string/subst.h> @@ -26,31 +29,173 @@ using namespace Aws::Client; using namespace Aws::S3; using namespace Aws; +static constexpr TDuration MaxDelay = TDuration::Minutes(10); + +template <class TDerived> +class TGetterFromS3 : public TActorBootstrapped<TDerived> { +protected: + explicit TGetterFromS3(TImportInfo::TPtr importInfo, TMaybe<NBackup::TEncryptionIV> iv) + : ImportInfo(std::move(importInfo)) + , ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(ImportInfo->Settings)) + , IV(std::move(iv)) + , Retries(ImportInfo->Settings.number_of_retries()) + { + if (ImportInfo->Settings.has_encryption_settings()) { + Key = NBackup::TEncryptionKey(ImportInfo->Settings.encryption_settings().symmetric_key().key()); + } + } + + void HeadObject(const TString& key, bool autoAddEncSuffix = true) { + auto request = Model::HeadObjectRequest() + .WithKey(GetKey(key, autoAddEncSuffix)); + + this->Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request)); + } + + void GetObject(const TString& key, const std::pair<ui64, ui64>& range, bool autoAddEncSuffix = true) { + auto request = Model::GetObjectRequest() + .WithKey(GetKey(key, autoAddEncSuffix)) + .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); + + this->Send(Client, new TEvExternalStorage::TEvGetObjectRequest(request)); + } + + void GetObject(const TString& key, ui64 contentLength, bool autoAddEncSuffix = true) { + GetObject(key, std::make_pair(0, contentLength - 1), autoAddEncSuffix); + } + + void ListObjects(const TString& prefix) { + auto request = Model::ListObjectsRequest() + .WithPrefix(prefix); + + this->Send(Client, new TEvExternalStorage::TEvListObjectsRequest(request)); + } + + void Download(const TString& key, bool autoAddEncSuffix = true) { + CreateClient(); + HeadObject(key, autoAddEncSuffix); + } + + void CreateClient() { + if (Client) { + this->Send(Client, new TEvents::TEvPoisonPill()); + } + Client = this->RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); + } + + void PassAway() override { + this->Send(Client, new TEvents::TEvPoisonPill()); + TActorBootstrapped<TDerived>::PassAway(); + } + + TString GetKey(TString key, bool autoAddEncSuffix = true) { + if (autoAddEncSuffix && ImportInfo->Settings.has_encryption_settings()) { + key += ".enc"; + } + return key; + } + + template <typename TResult> + bool CheckResult(const TResult& result, const TStringBuf marker) { + if (result.IsSuccess()) { + return true; + } + + LOG_E("Error at '" << marker << "'" + << ": self# " << this->SelfId() + << ", error# " << result); + MaybeRetry(result.GetError()); + + return false; + } + + void MaybeRetry(const Aws::S3::S3Error& error) { + if (Attempt < Retries && error.ShouldRetry()) { + Delay = Min(Delay * ++Attempt, MaxDelay); + this->Schedule(Delay, new TEvents::TEvWakeup()); + } else { + Reply(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str()); + } + } + + void ResetRetries() { + Attempt = 0; + } + + // If export is encrypted, decrypts and gets export IV, + // else returns true + bool MaybeDecryptAndSaveIV(const TString& content, TString& result) { + if (Key) { + try { + auto [buffer, iv] = NBackup::TEncryptedFileDeserializer::DecryptFullFile(*Key, TBuffer(content.data(), content.size())); + IV = iv; + result.assign(buffer.Data(), buffer.Size()); + return true; + } catch (const std::exception& ex) { + Reply(false, ex.what()); + return false; + } + } + result = content; + return true; + } + + bool MaybeDecrypt(const TString& content, TString& result, NBackup::EBackupFileType fileType) { + if (Key && IV) { + try { + NBackup::TEncryptionIV expectedIV = NBackup::TEncryptionIV::Combine(*IV, fileType, 0 /* already combined */, 0); + auto buffer = NBackup::TEncryptedFileDeserializer::DecryptFullFile(*Key, expectedIV, TBuffer(content.data(), content.size())); + result.assign(buffer.Data(), buffer.Size()); + return true; + } catch (const std::exception& ex) { + Reply(false, ex.what()); + return false; + } + } + result = content; + return true; + } + + virtual void Reply(bool success = true, const TString& error = TString()) = 0; + +protected: + TImportInfo::TPtr ImportInfo; + NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; + TActorId Client; + TMaybe<NBackup::TEncryptionKey> Key; + TMaybe<NBackup::TEncryptionIV> IV; + + const ui32 Retries; + ui32 Attempt = 0; + + TDuration Delay = TDuration::Minutes(1); +}; + // Downloads scheme-related objects from S3 -class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { - static TString MetadataKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) { - Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size()); - return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/metadata.json"; +class TSchemeGetter: public TGetterFromS3<TSchemeGetter> { + static TString MetadataKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << "/metadata.json"; } - static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, TStringBuf filename) { - Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size()); - return TStringBuilder() << settings.items(itemIdx).source_prefix() << '/' << filename; + static TString SchemeKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx, TStringBuf filename) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << '/' << filename; } - static TString PermissionsKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) { - Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size()); - return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb"; + static TString PermissionsKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << "/permissions.pb"; } - static TString ChangefeedDescriptionKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, const TString& changefeedName) { - Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size()); - return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/changefeed_description.pb"; + static TString ChangefeedDescriptionKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx, const TString& changefeedName) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << "/" << changefeedName << "/changefeed_description.pb"; } - static TString TopicDescriptionKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, const TString& changefeedName) { - Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size()); - return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/topic_description.pb"; + static TString TopicDescriptionKeyFromSettings(const TImportInfo::TPtr& importInfo, ui32 itemIdx, const TString& changefeedName) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + return TStringBuilder() << importInfo->GetItemSrcPrefix(itemIdx) << "/" << changefeedName << "/topic_description.pb"; } static bool IsView(TStringBuf schemeKey) { @@ -61,13 +206,6 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY; } - void HeadObject(const TString& key) { - auto request = Model::HeadObjectRequest() - .WithKey(key); - - Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request)); - } - void HandleMetadata(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { const auto& result = ev->Get()->Result; @@ -79,8 +217,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } - const auto contentLength = result.GetResult().GetContentLength(); - GetObject(MetadataKey, std::make_pair(0, contentLength - 1)); + GetObject(MetadataKey, result.GetResult().GetContentLength()); } void HandleScheme(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { @@ -92,7 +229,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { if (!IsView(SchemeKey) && NoObjectFound(result.GetError().GetErrorType())) { // try search for a view - SchemeKey = SchemeKeyFromSettings(ImportInfo->Settings, ItemIdx, NYdb::NDump::NFiles::CreateView().FileName); + SchemeKey = SchemeKeyFromSettings(ImportInfo, ItemIdx, NYdb::NDump::NFiles::CreateView().FileName); HeadObject(SchemeKey); return; } @@ -101,8 +238,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } - const auto contentLength = result.GetResult().GetContentLength(); - GetObject(SchemeKey, std::make_pair(0, contentLength - 1)); + GetObject(SchemeKey, result.GetResult().GetContentLength()); } void HandlePermissions(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { @@ -119,8 +255,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } - const auto contentLength = result.GetResult().GetContentLength(); - GetObject(PermissionsKey, std::make_pair(0, contentLength - 1)); + GetObject(PermissionsKey, result.GetResult().GetContentLength()); } void HandleChecksum(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { @@ -134,8 +269,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } - const auto contentLength = result.GetResult().GetContentLength(); - GetObject(NBackup::ChecksumKey(CurrentObjectKey), std::make_pair(0, contentLength - 1)); + GetObject(NBackup::ChecksumKey(CurrentObjectKey), result.GetResult().GetContentLength(), false); } void HandleChangefeed(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { @@ -149,9 +283,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } - const auto contentLength = result.GetResult().GetContentLength(); Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size()); - GetObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1)); + GetObject(ChangefeedDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), result.GetResult().GetContentLength()); } void HandleTopic(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { @@ -165,17 +298,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } - const auto contentLength = result.GetResult().GetContentLength(); Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size()); - GetObject(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1)); - } - - void GetObject(const TString& key, const std::pair<ui64, ui64>& range) { - auto request = Model::GetObjectRequest() - .WithKey(key) - .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); - - Send(Client, new TEvExternalStorage::TEvGetObjectRequest(request)); + GetObject(TopicDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), result.GetResult().GetContentLength()); } void HandleMetadata(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { @@ -190,14 +314,19 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } + TString content; + if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::Metadata)) { + return; + } + Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size()); auto& item = ImportInfo->Items.at(ItemIdx); LOG_T("Trying to parse metadata" << ": self# " << SelfId() - << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); + << ", body# " << SubstGlobalCopy(content, "\n", "\\n")); - item.Metadata = NBackup::TMetadata::Deserialize(msg.Body); + item.Metadata = NBackup::TMetadata::Deserialize(content); if (item.Metadata.HasVersion() && item.Metadata.GetVersion() == 0) { NeedValidateChecksums = false; @@ -208,7 +337,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { }; if (NeedValidateChecksums) { - StartValidatingChecksum(MetadataKey, msg.Body, nextStep); + StartValidatingChecksum(MetadataKey, content, nextStep); } else { nextStep(); } @@ -226,6 +355,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } + TString content; + if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::TableSchema)) { + return; + } + Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size()); auto& item = ImportInfo->Items.at(ItemIdx); @@ -233,11 +367,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { << ": self# " << SelfId() << ", itemIdx# " << ItemIdx << ", schemeKey# " << SchemeKey - << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); + << ", body# " << SubstGlobalCopy(content, "\n", "\\n")); if (IsView(SchemeKey)) { - item.CreationQuery = msg.Body; - } else if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) { + item.CreationQuery = content; + } else if (!google::protobuf::TextFormat::ParseFromString(content, &item.Scheme)) { return Reply(false, "Cannot parse scheme"); } @@ -250,7 +384,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { }; if (NeedValidateChecksums) { - StartValidatingChecksum(SchemeKey, msg.Body, nextStep); + StartValidatingChecksum(SchemeKey, content, nextStep); } else { nextStep(); } @@ -268,15 +402,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } + TString content; + if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::Permissions)) { + return; + } + Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size()); auto& item = ImportInfo->Items.at(ItemIdx); LOG_T("Trying to parse permissions" << ": self# " << SelfId() - << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); + << ", body# " << SubstGlobalCopy(content, "\n", "\\n")); Ydb::Scheme::ModifyPermissionsRequest permissions; - if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &permissions)) { + if (!google::protobuf::TextFormat::ParseFromString(content, &permissions)) { return Reply(false, "Cannot parse permissions"); } item.Permissions = std::move(permissions); @@ -286,7 +425,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { }; if (NeedValidateChecksums) { - StartValidatingChecksum(PermissionsKey, msg.Body, nextStep); + StartValidatingChecksum(PermissionsKey, content, nextStep); } else { nextStep(); } @@ -326,15 +465,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } + TString content; + if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::TableChangefeed)) { + return; + } + Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size()); auto& item = ImportInfo->Items.at(ItemIdx); LOG_T("Trying to parse changefeed" << ": self# " << SelfId() - << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); + << ", body# " << SubstGlobalCopy(content, "\n", "\\n")); Ydb::Table::ChangefeedDescription changefeed; - if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &changefeed)) { + if (!google::protobuf::TextFormat::ParseFromString(content, &changefeed)) { return Reply(false, "Cannot parse сhangefeed"); } @@ -342,14 +486,14 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { auto nextStep = [this]() { Become(&TThis::StateDownloadTopics); - HeadObject(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed])); + HeadObject(TopicDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed])); }; if (NeedValidateChecksums) { - StartValidatingChecksum(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), msg.Body, nextStep); + StartValidatingChecksum(ChangefeedDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), content, nextStep); } else { nextStep(); - } + } } void HandleTopic(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { @@ -364,15 +508,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { return; } + TString content; + if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::TableTopic)) { + return; + } + Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size()); auto& item = ImportInfo->Items.at(ItemIdx); LOG_T("Trying to parse topic" << ": self# " << SelfId() - << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); + << ", body# " << SubstGlobalCopy(content, "\n", "\\n")); Ydb::Topic::DescribeTopicResult topic; - if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &topic)) { + if (!google::protobuf::TextFormat::ParseFromString(content, &topic)) { return Reply(false, "Cannot parse topic"); } *item.Changefeeds.MutableChangefeeds(IndexDownloadedChangefeed)->MutableTopic() = std::move(topic); @@ -382,22 +531,15 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { Reply(); } else { Become(&TThis::StateDownloadChangefeeds); - HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed])); + HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed])); } }; if (NeedValidateChecksums) { - StartValidatingChecksum(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), msg.Body, nextStep); + StartValidatingChecksum(TopicDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), content, nextStep); } else { nextStep(); - } - } - - void ListObjects(const TString& prefix) { - auto request = Model::ListObjectsRequest() - .WithPrefix(prefix); - - Send(Client, new TEvExternalStorage::TEvListObjectsRequest(request)); + } } template <typename T> @@ -431,37 +573,14 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { Resize(item.Changefeeds.MutableChangefeeds(), ChangefeedsNames.size()); Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size()); - HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed])); + HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed])); } else { Reply(); } } - template <typename TResult> - bool CheckResult(const TResult& result, const TStringBuf marker) { - if (result.IsSuccess()) { - return true; - } - - LOG_E("Error at '" << marker << "'" - << ": self# " << SelfId() - << ", error# " << result); - MaybeRetry(result.GetError()); - - return false; - } - - void MaybeRetry(const Aws::S3::S3Error& error) { - if (Attempt < Retries && error.ShouldRetry()) { - Delay = Min(Delay * ++Attempt, MaxDelay); - Schedule(Delay, new TEvents::TEvWakeup()); - } else { - Reply(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str()); - } - } - - void Reply(bool success = true, const TString& error = TString()) { + void Reply(bool success = true, const TString& error = TString()) override { LOG_I("Reply" << ": self# " << SelfId() << ", success# " << success @@ -471,26 +590,9 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { PassAway(); } - void PassAway() override { - Send(Client, new TEvents::TEvPoisonPill()); - TActor::PassAway(); - } - - void CreateClient() { - if (Client) { - Send(Client, new TEvents::TEvPoisonPill()); - } - Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); - } - void ListChangefeeds() { CreateClient(); - ListObjects(ImportInfo->Settings.items(ItemIdx).source_prefix()); - } - - void Download(const TString& key) { - CreateClient(); - HeadObject(key); + ListObjects(ImportInfo->GetItemSrcPrefix(ItemIdx)); } void DownloadMetadata() { @@ -506,7 +608,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { } void DownloadChecksum() { - Download(NBackup::ChecksumKey(CurrentObjectKey)); + Download(NBackup::ChecksumKey(CurrentObjectKey), false); } void DownloadChangefeeds() { @@ -514,10 +616,6 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { ListChangefeeds(); } - void ResetRetries() { - Attempt = 0; - } - void StartDownloadingScheme() { ResetRetries(); DownloadScheme(); @@ -546,17 +644,15 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> { } public: - explicit TSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx) - : ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(importInfo->Settings)) + explicit TSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe<NBackup::TEncryptionIV> iv) + : TGetterFromS3<TSchemeGetter>(std::move(importInfo), std::move(iv)) , ReplyTo(replyTo) - , ImportInfo(importInfo) , ItemIdx(itemIdx) - , MetadataKey(MetadataKeyFromSettings(importInfo->Settings, itemIdx)) - , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx, "scheme.pb")) - , PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx)) - , Retries(importInfo->Settings.number_of_retries()) - , NeedDownloadPermissions(!importInfo->Settings.no_acl()) - , NeedValidateChecksums(!importInfo->Settings.skip_checksum_validation()) + , MetadataKey(MetadataKeyFromSettings(ImportInfo, itemIdx)) + , SchemeKey(SchemeKeyFromSettings(ImportInfo, itemIdx, "scheme.pb")) + , PermissionsKey(PermissionsKeyFromSettings(ImportInfo, itemIdx)) + , NeedDownloadPermissions(!ImportInfo->Settings.no_acl()) + , NeedValidateChecksums(!ImportInfo->Settings.skip_checksum_validation()) { } @@ -627,9 +723,7 @@ public: } private: - NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; const TActorId ReplyTo; - TImportInfo::TPtr ImportInfo; const ui32 ItemIdx; const TString MetadataKey; @@ -638,16 +732,8 @@ private: TVector<TString> ChangefeedsNames; ui64 IndexDownloadedChangefeed = 0; - const ui32 Retries; - ui32 Attempt = 0; - - TDuration Delay = TDuration::Minutes(1); - static constexpr TDuration MaxDelay = TDuration::Minutes(10); - const bool NeedDownloadPermissions = true; - TActorId Client; - bool NeedValidateChecksums = true; TString CurrentObjectChecksum; @@ -655,8 +741,192 @@ private: std::function<void()> ChecksumValidatedCallback; }; // TSchemeGetter -IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx) { - return new TSchemeGetter(replyTo, importInfo, itemIdx); +class TSchemaMappingGetter : public TGetterFromS3<TSchemaMappingGetter> { + static TString SchemaMappingKeyFromSettings(const TImportInfo::TPtr& importInfo) { + return TStringBuilder() << importInfo->Settings.source_prefix() << "/SchemaMapping/mapping.json"; + } + + static TString SchemaMappingMetadataKeyFromSettings(const TImportInfo::TPtr& importInfo) { + return TStringBuilder() << importInfo->Settings.source_prefix() << "/SchemaMapping/metadata.json"; + } + + void HandleMetadata(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleMetadata TEvExternalStorage::TEvHeadObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "HeadObject")) { + return; + } + + GetObject(MetadataKey, result.GetResult().GetContentLength()); + } + + void HandleSchemaMapping(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleSchemaMapping TEvExternalStorage::TEvHeadObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "HeadObject")) { + return; + } + + GetObject(SchemaMappingKey, result.GetResult().GetContentLength()); + } + + void HandleMetadata(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { + const auto& msg = *ev->Get(); + const auto& result = msg.Result; + + LOG_D("HandleMetadata TEvExternalStorage::TEvGetObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "GetObject")) { + return; + } + + TString content; + if (!MaybeDecryptAndSaveIV(msg.Body, content)) { + return; + } + ImportInfo->ExportIV = IV; + + LOG_T("Trying to parse metadata" + << ": self# " << SelfId() + << ", body# " << SubstGlobalCopy(content, "\n", "\\n")); + + if (!ProcessMetadata(content)) { + return; + } + + auto nextStep = [this]() { + StartDownloadingSchemaMapping(); + }; + + nextStep(); + } + + void HandleSchemaMapping(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { + const auto& msg = *ev->Get(); + const auto& result = msg.Result; + + LOG_D("HandleSchemaMapping TEvExternalStorage::TEvGetObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "GetObject")) { + return; + } + + TString content; + if (!MaybeDecrypt(msg.Body, content, NBackup::EBackupFileType::SchemaMapping)) { + return; + } + + LOG_T("Trying to parse scheme" + << ": self# " << SelfId() + << ", schemaMappingKey# " << SchemaMappingKey + << ", body# " << SubstGlobalCopy(content, "\n", "\\n")); + + ImportInfo->SchemaMapping.ConstructInPlace(); + TString error; + if (!ImportInfo->SchemaMapping->Deserialize(content, error)) { + Reply(false, error); + return; + } + + Reply(); + } + + void Reply(bool success = true, const TString& error = TString()) override { + LOG_I("Reply" + << ": self# " << SelfId() + << ", success# " << success + << ", error# " << error); + + Send(ReplyTo, new TEvPrivate::TEvImportSchemaMappingReady(ImportInfo->Id, success, error)); + PassAway(); + } + + void DownloadMetadata() { + Download(MetadataKey); + } + + void DownloadSchemaMapping() { + Download(SchemaMappingKey); + } + + void StartDownloadingSchemaMapping() { + ResetRetries(); + DownloadSchemaMapping(); + Become(&TThis::StateDownloadSchemaMapping); + } + + bool ProcessMetadata(const TString& content) { + NJson::TJsonValue json; + if (!NJson::ReadJsonTree(content, &json)) { + Reply(false, "Failed to parse metadata json"); + return false; + } + const NJson::TJsonValue& kind = json["kind"]; + if (kind.GetString() != "SchemaMappingV0") { + Reply(false, TStringBuilder() << "Unknown kind of metadata json: " << kind.GetString()); + return false; + } + return true; + } + +public: + TSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo) + : TGetterFromS3<TSchemaMappingGetter>(std::move(importInfo), Nothing()) + , ReplyTo(replyTo) + , MetadataKey(SchemaMappingMetadataKeyFromSettings(ImportInfo)) + , SchemaMappingKey(SchemaMappingKeyFromSettings(ImportInfo)) + { + } + + void Bootstrap() { + DownloadMetadata(); + Become(&TThis::StateDownloadMetadata); + } + + STATEFN(StateDownloadMetadata) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleMetadata); + hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleMetadata); + + sFunc(TEvents::TEvWakeup, DownloadMetadata); + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateDownloadSchemaMapping) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleSchemaMapping); + hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleSchemaMapping); + + sFunc(TEvents::TEvWakeup, DownloadSchemaMapping); + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + +private: + const TActorId ReplyTo; + const TString MetadataKey; + const TString SchemaMappingKey; +}; // TSchemaMappingGetter + +IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe<NBackup::TEncryptionIV> iv) { + return new TSchemeGetter(replyTo, std::move(importInfo), itemIdx, std::move(iv)); +} + +IActor* CreateSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo) { + return new TSchemaMappingGetter(replyTo, std::move(importInfo)); } } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h index 24b67c7c65..32b108c20e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.h @@ -6,7 +6,9 @@ namespace NKikimr { namespace NSchemeShard { -IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx); +IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe<NBackup::TEncryptionIV> iv); + +IActor* CreateSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo); } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp index 9c3c9f91fb..f3b6aee836 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter_fallback.cpp @@ -25,11 +25,32 @@ private: const TActorId ReplyTo; TImportInfo::TPtr ImportInfo; const ui32 ItemIdx; +}; // TSchemeGetterFallback + +class TSchemaMappingGetterFallback: public TActorBootstrapped<TSchemaMappingGetterFallback> { +public: + explicit TSchemaMappingGetterFallback(const TActorId& replyTo, TImportInfo::TPtr importInfo) + : ReplyTo(replyTo) + , ImportInfo(importInfo) + { + } + + void Bootstrap() { + Send(ReplyTo, new TEvPrivate::TEvImportSchemaMappingReady(ImportInfo->Id, false, "Imports from S3 are disabled")); + PassAway(); + } +private: + const TActorId ReplyTo; + TImportInfo::TPtr ImportInfo; }; // TSchemeGetterFallback -IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx) { - return new TSchemeGetterFallback(replyTo, importInfo, itemIdx); +IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe<NBackup::TEncryptionIV>) { + return new TSchemeGetterFallback(replyTo, std::move(importInfo), itemIdx); +} + +IActor* CreateSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo) { + return new TSchemaMappingGetterFallback(replyTo, std::move(importInfo)); } } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 351a21adf0..7e0879ea53 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -12,6 +12,7 @@ #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/control/lib/immediate_control_board_impl.h> +#include <ydb/core/backup/common/encryption.h> #include <ydb/core/backup/common/metadata.h> #include <ydb/core/base/feature_flags.h> #include <ydb/core/base/table_vector_index.h> @@ -2846,12 +2847,13 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> { enum class EState: ui8 { Invalid = 0, - Waiting, - GetScheme, - CreateSchemeObject, - Transferring, - BuildIndexes, - CreateChangefeed, + Waiting = 1, + GetScheme = 2, + CreateSchemeObject = 3, + Transferring = 4, + BuildIndexes = 5, + CreateChangefeed = 6, + DownloadExportMetadata = 7, Done = 240, Cancellation = 250, Cancelled = 251, @@ -2875,6 +2877,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> { TString DstPathName; TPathId DstPathId; + TString SrcPrefix; Ydb::Table::CreateTableRequest Scheme; TString CreationQuery; TMaybe<NKikimrSchemeOp::TModifyScheme> PreparedCreationQuery; @@ -2892,6 +2895,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> { int NextChangefeedIdx = 0; TString Issue; TPathId StreamImplPathId; + TMaybe<NBackup::TEncryptionIV> ExportItemIV; TItem() = default; @@ -2918,6 +2922,9 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> { TPathId DomainPathId; TMaybe<TString> UserSID; TString PeerName; // required for making audit log records + TMaybe<NBackup::TEncryptionIV> ExportIV; + TMaybe<NBackup::TSchemaMapping> SchemaMapping; + TActorId SchemaMappingGetter; EState State = EState::Invalid; TString Issue; @@ -2929,6 +2936,20 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> { TInstant StartTime = TInstant::Zero(); TInstant EndTime = TInstant::Zero(); + TString GetItemSrcPrefix(size_t i) const { + if (i < Items.size() && Items[i].SrcPrefix) { + return Items[i].SrcPrefix; + } + + // Backward compatibility. + // But there can be no paths in settings at all. + if (i < ui32(Settings.items_size())) { + return Settings.items(i).source_prefix(); + } + + return {}; + } + explicit TImportInfo( const ui64 id, const TString& uid, @@ -3195,8 +3216,8 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { Parent = ParentEnd(); } - void Set(ui32 level, - NTableIndex::TClusterId parentBegin, NTableIndex::TClusterId parent, + void Set(ui32 level, + NTableIndex::TClusterId parentBegin, NTableIndex::TClusterId parent, NTableIndex::TClusterId childBegin, NTableIndex::TClusterId child, ui32 state, ui64 tableSize) { Level = level; @@ -3665,7 +3686,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { TSerializedTableRange bound{range}; LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::BUILD_INDEX, - "AddShardStatus id# " << Id << " shard " << shardIdx << + "AddShardStatus id# " << Id << " shard " << shardIdx << " range " << KMeans.RangeToDebugStr(bound, IsBuildPrefixedVectorIndex() ? 2 : 1)); AddParent(bound, shardIdx); Shards.emplace( diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index 1e653640ce..d6bcba2d81 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -20,6 +20,7 @@ namespace TEvPrivate { EvRunConditionalErase, EvIndexBuildBilling, EvImportSchemeReady, + EvImportSchemaMappingReady, EvImportSchemeQueryResult, EvExportSchemeUploadResult, EvExportUploadMetadataResult, @@ -105,6 +106,18 @@ namespace TEvPrivate { {} }; + struct TEvImportSchemaMappingReady: public TEventLocal<TEvImportSchemaMappingReady, EvImportSchemaMappingReady> { + const ui64 ImportId; + const bool Success; + const TString Error; + + TEvImportSchemaMappingReady(ui64 id, bool success, const TString& error) + : ImportId(id) + , Success(success) + , Error(error) + {} + }; + struct TEvImportSchemeQueryResult: public TEventLocal<TEvImportSchemeQueryResult, EvImportSchemeQueryResult> { const ui64 ImportId; const ui32 ItemIdx; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index baa342f150..4c1c693933 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1596,6 +1596,8 @@ struct Schema : NIceDb::Schema { struct NextIndexIdx : Column<9, NScheme::NTypeIds::Uint32> {}; struct NextChangefeedIdx : Column<16, NScheme::NTypeIds::Uint32> {}; struct Issue : Column<10, NScheme::NTypeIds::Utf8> {}; + struct SrcPrefix : Column<17, NScheme::NTypeIds::Utf8> {}; + struct EncryptionIV : Column<18, NScheme::NTypeIds::String> {}; using TKey = TableKey<ImportId, Index>; using TColumns = TableColumns< @@ -1614,7 +1616,9 @@ struct Schema : NIceDb::Schema { WaitTxId, NextIndexIdx, NextChangefeedIdx, - Issue + Issue, + SrcPrefix, + EncryptionIV >; }; diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index 676d59c9d7..0693679da5 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -1564,9 +1564,10 @@ value { } } - Y_UNIT_TEST(ExportImportOnSupportedDatatypes) { + void ExportImportOnSupportedDatatypesImpl(bool encrypted, bool commonPrefix) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableParameterizedDecimal(true)); + runtime.GetAppData().FeatureFlags.SetEnableEncryptedExport(true); ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"_( @@ -1656,16 +1657,51 @@ value { TS3Mock s3Mock({}, TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start()); + TString encryptionSettings; + if (encrypted) { + encryptionSettings = R"(encryption_settings { + encryption_algorithm: "ChaCha20-Poly1305" + symmetric_key { + key: "Very very secret export key!!!!!" + } + })"; + } + TString exportItems, importItems; + if (commonPrefix) { + exportItems = R"( + source_path: "/MyRoot" + destination_prefix: "BackupPrefix" + items { + source_path: "Table" + } + )"; + importItems = R"( + source_prefix: "BackupPrefix" + destination_path: "/MyRoot/Restored" + )"; + } else { + exportItems = R"( + items { + source_path: "/MyRoot/Table" + destination_prefix: "Backup1" + } + )"; + importItems = R"( + items { + source_prefix: "Backup1" + destination_path: "/MyRoot/Restored" + } + )"; + } + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP - items { - source_path: "/MyRoot/Table" - destination_prefix: "Backup1" - } + %s + %s } - )", port)); + )", port, exportItems.c_str(), encryptionSettings.c_str())); env.TestWaitNotification(runtime, txId); TestGetExport(runtime, txId, "/MyRoot"); @@ -1673,12 +1709,10 @@ value { ImportFromS3Settings { endpoint: "localhost:%d" scheme: HTTP - items { - source_prefix: "Backup1" - destination_path: "/MyRoot/Restored" - } + %s + %s } - )", port)); + )", port, importItems.c_str(), encryptionSettings.c_str())); env.TestWaitNotification(runtime, txId); TestGetImport(runtime, txId, "/MyRoot"); @@ -1744,10 +1778,22 @@ value { auto contentOriginalTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", readKeyDesc, readColumns); NKqp::CompareYson(expectedJson, contentOriginalTable); - auto contentRestoredTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets + 2, "Restored", readKeyDesc, readColumns); + auto contentRestoredTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets + 2, commonPrefix ? "Table" : "Restored", readKeyDesc, readColumns); NKqp::CompareYson(expectedJson, contentRestoredTable); } + Y_UNIT_TEST(ExportImportOnSupportedDatatypes) { + ExportImportOnSupportedDatatypesImpl(false, false); + } + + Y_UNIT_TEST(ExportImportOnSupportedDatatypesWithCommonDestPrefix) { + ExportImportOnSupportedDatatypesImpl(false, true); + } + + Y_UNIT_TEST(ExportImportOnSupportedDatatypesEncrypted) { + ExportImportOnSupportedDatatypesImpl(true, true); + } + Y_UNIT_TEST(ExportImportPg) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true)); diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index d9dea01aab..f76ea2e948 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6259,6 +6259,16 @@ "ColumnId": 16, "ColumnName": "NextChangefeedIdx", "ColumnType": "Uint32" + }, + { + "ColumnId": 17, + "ColumnName": "SrcPrefix", + "ColumnType": "Utf8" + }, + { + "ColumnId": 18, + "ColumnName": "EncryptionIV", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -6280,7 +6290,9 @@ 13, 14, 15, - 16 + 16, + 17, + 18 ], "RoomID": 0, "Codec": 0, |