diff options
| author | Vasily Gerasimov <[email protected]> | 2025-05-19 15:19:31 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-05-19 15:19:31 +0300 |
| commit | d6e49c56fa759b8e47f18605e8a9d0d56761d3c7 (patch) | |
| tree | 3cc9892d6f4abfa4b44bf132b208295dec19de07 | |
| parent | e7add4b3d61d878be3ee870907388d3b7d23b553 (diff) | |
Fix after PR #16190: make PersistExportMetadata method, pass ExportInfo by ref in functions (#18425)
7 files changed, 274 insertions, 269 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index 2799057c437..28fe7a8e3d7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -17,23 +17,23 @@ namespace { issue.set_message(errorMessage); } - void FillIssues(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo) { - if (exportInfo->Issue) { - AddIssue(exprt, exportInfo->Issue); + void FillIssues(NKikimrExport::TExport& exprt, const TExportInfo& exportInfo) { + if (exportInfo.Issue) { + AddIssue(exprt, exportInfo.Issue); } - for (const auto& item : exportInfo->Items) { + for (const auto& item : exportInfo.Items) { if (item.Issue) { AddIssue(exprt, item.Issue); } } } - void FillItemProgress(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx, + void FillItemProgress(TSchemeShard* ss, const TExportInfo& exportInfo, ui32 itemIdx, Ydb::Export::ExportItemProgress& itemProgress) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - const auto& item = exportInfo->Items.at(itemIdx); + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + const auto& item = exportInfo.Items.at(itemIdx); const auto opId = TOperationId(item.WaitTxId, FirstSubTxId); if (item.WaitTxId != InvalidTxId && ss->TxInFlight.contains(opId)) { @@ -99,7 +99,7 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn case TExportInfo::EState::Transferring: case TExportInfo::EState::Done: for (ui32 itemIdx : xrange(exportInfo->Items.size())) { - FillItemProgress(this, exportInfo, itemIdx, *exprt.AddItemsProgress()); + FillItemProgress(this, *exportInfo, itemIdx, *exprt.AddItemsProgress()); } exprt.SetProgress(exportInfo->IsDone() ? Ydb::Export::ExportProgress::PROGRESS_DONE @@ -111,13 +111,13 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn break; case TExportInfo::EState::Cancellation: - FillIssues(exprt, exportInfo); + FillIssues(exprt, *exportInfo); exprt.SetProgress(Ydb::Export::ExportProgress::PROGRESS_CANCELLATION); break; case TExportInfo::EState::Cancelled: exprt.SetStatus(Ydb::StatusIds::CANCELLED); - FillIssues(exprt, exportInfo); + FillIssues(exprt, *exportInfo); exprt.SetProgress(Ydb::Export::ExportProgress::PROGRESS_CANCELLED); break; @@ -141,28 +141,28 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn } } -void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo) { - db.Table<Schema::Exports>().Key(exportInfo->Id).Update( - NIceDb::TUpdate<Schema::Exports::Uid>(exportInfo->Uid), - NIceDb::TUpdate<Schema::Exports::Kind>(static_cast<ui8>(exportInfo->Kind)), - NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo->Settings), - NIceDb::TUpdate<Schema::Exports::DomainPathOwnerId>(exportInfo->DomainPathId.OwnerId), - NIceDb::TUpdate<Schema::Exports::DomainPathId>(exportInfo->DomainPathId.LocalPathId), - NIceDb::TUpdate<Schema::Exports::Items>(exportInfo->Items.size()), - NIceDb::TUpdate<Schema::Exports::EnableChecksums>(exportInfo->EnableChecksums), - NIceDb::TUpdate<Schema::Exports::EnablePermissions>(exportInfo->EnablePermissions) +void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) { + db.Table<Schema::Exports>().Key(exportInfo.Id).Update( + NIceDb::TUpdate<Schema::Exports::Uid>(exportInfo.Uid), + NIceDb::TUpdate<Schema::Exports::Kind>(static_cast<ui8>(exportInfo.Kind)), + NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo.Settings), + NIceDb::TUpdate<Schema::Exports::DomainPathOwnerId>(exportInfo.DomainPathId.OwnerId), + NIceDb::TUpdate<Schema::Exports::DomainPathId>(exportInfo.DomainPathId.LocalPathId), + NIceDb::TUpdate<Schema::Exports::Items>(exportInfo.Items.size()), + NIceDb::TUpdate<Schema::Exports::EnableChecksums>(exportInfo.EnableChecksums), + NIceDb::TUpdate<Schema::Exports::EnablePermissions>(exportInfo.EnablePermissions) ); - if (exportInfo->UserSID) { - db.Table<Schema::Exports>().Key(exportInfo->Id).Update( - NIceDb::TUpdate<Schema::Exports::UserSID>(*exportInfo->UserSID) + if (exportInfo.UserSID) { + db.Table<Schema::Exports>().Key(exportInfo.Id).Update( + NIceDb::TUpdate<Schema::Exports::UserSID>(*exportInfo.UserSID) ); } - for (ui32 itemIdx : xrange(exportInfo->Items.size())) { - const auto& item = exportInfo->Items.at(itemIdx); + for (ui32 itemIdx : xrange(exportInfo.Items.size())) { + const auto& item = exportInfo.Items.at(itemIdx); - db.Table<Schema::ExportItems>().Key(exportInfo->Id, itemIdx).Update( + db.Table<Schema::ExportItems>().Key(exportInfo.Id, itemIdx).Update( NIceDb::TUpdate<Schema::ExportItems::SourcePathName>(item.SourcePathName), NIceDb::TUpdate<Schema::ExportItems::SourceOwnerPathId>(item.SourcePathId.OwnerId), NIceDb::TUpdate<Schema::ExportItems::SourcePathId>(item.SourcePathId.LocalPathId), @@ -172,36 +172,43 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T } } -void TSchemeShard::PersistRemoveExport(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo) { - for (ui32 itemIdx : xrange(exportInfo->Items.size())) { - db.Table<Schema::ExportItems>().Key(exportInfo->Id, itemIdx).Delete(); +void TSchemeShard::PersistRemoveExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) { + for (ui32 itemIdx : xrange(exportInfo.Items.size())) { + db.Table<Schema::ExportItems>().Key(exportInfo.Id, itemIdx).Delete(); } - db.Table<Schema::Exports>().Key(exportInfo->Id).Delete(); + db.Table<Schema::Exports>().Key(exportInfo.Id).Delete(); +} + +void TSchemeShard::PersistExportPathId(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) { + db.Table<Schema::Exports>().Key(exportInfo.Id).Update( + NIceDb::TUpdate<Schema::Exports::ExportOwnerPathId>(exportInfo.ExportPathId.OwnerId), + NIceDb::TUpdate<Schema::Exports::ExportPathId>(exportInfo.ExportPathId.LocalPathId) + ); } -void TSchemeShard::PersistExportPathId(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo) { - db.Table<Schema::Exports>().Key(exportInfo->Id).Update( - NIceDb::TUpdate<Schema::Exports::ExportOwnerPathId>(exportInfo->ExportPathId.OwnerId), - NIceDb::TUpdate<Schema::Exports::ExportPathId>(exportInfo->ExportPathId.LocalPathId) +void TSchemeShard::PersistExportState(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) { + db.Table<Schema::Exports>().Key(exportInfo.Id).Update( + NIceDb::TUpdate<Schema::Exports::State>(static_cast<ui8>(exportInfo.State)), + NIceDb::TUpdate<Schema::Exports::WaitTxId>(exportInfo.WaitTxId), + NIceDb::TUpdate<Schema::Exports::Issue>(exportInfo.Issue), + NIceDb::TUpdate<Schema::Exports::StartTime>(exportInfo.StartTime.Seconds()), + NIceDb::TUpdate<Schema::Exports::EndTime>(exportInfo.EndTime.Seconds()) ); } -void TSchemeShard::PersistExportState(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo) { - db.Table<Schema::Exports>().Key(exportInfo->Id).Update( - NIceDb::TUpdate<Schema::Exports::State>(static_cast<ui8>(exportInfo->State)), - NIceDb::TUpdate<Schema::Exports::WaitTxId>(exportInfo->WaitTxId), - NIceDb::TUpdate<Schema::Exports::Issue>(exportInfo->Issue), - NIceDb::TUpdate<Schema::Exports::StartTime>(exportInfo->StartTime.Seconds()), - NIceDb::TUpdate<Schema::Exports::EndTime>(exportInfo->EndTime.Seconds()) +void TSchemeShard::PersistExportMetadata(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) { + db.Table<Schema::Exports>().Key(exportInfo.Id).Update( + NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo.Settings), + NIceDb::TUpdate<Schema::Exports::ExportMetadata>(exportInfo.ExportMetadata) ); } -void TSchemeShard::PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo, ui32 itemIdx) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - const auto& item = exportInfo->Items.at(itemIdx); +void TSchemeShard::PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo& exportInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + const auto& item = exportInfo.Items.at(itemIdx); - db.Table<Schema::ExportItems>().Key(exportInfo->Id, itemIdx).Update( + db.Table<Schema::ExportItems>().Key(exportInfo.Id, itemIdx).Update( NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State)), NIceDb::TUpdate<Schema::ExportItems::BackupTxId>(item.WaitTxId), NIceDb::TUpdate<Schema::ExportItems::Issue>(item.Issue) diff --git a/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp index ec7635c7cd2..4e692695437 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp @@ -80,7 +80,7 @@ struct TSchemeShard::TExport::TTxCancel: public TSchemeShard::TXxport::TTxBase { exportInfo->State = TExportInfo::EState::Cancellation; if (item.WaitTxId != InvalidTxId) { - Send(Self->SelfId(), CancelPropose(exportInfo, item.WaitTxId), 0, exportInfo->Id); + Send(Self->SelfId(), CancelPropose(*exportInfo, item.WaitTxId), 0, exportInfo->Id); } } } @@ -90,7 +90,7 @@ struct TSchemeShard::TExport::TTxCancel: public TSchemeShard::TXxport::TTxBase { } NIceDb::TNiceDb db(txc.DB); - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); Send(Request->Sender, std::move(response), 0, Request->Cookie); SendNotificationsIfFinished(exportInfo); @@ -169,12 +169,12 @@ struct TSchemeShard::TExport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas Self->TxIdToExport.erase(backupTxId); NIceDb::TNiceDb db(txc.DB); - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); if (cancelledItems == cancellableItems) { exportInfo->State = TExportInfo::EState::Cancelled; exportInfo->EndTime = TAppData::TimeProvider->Now(); - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); } SendNotificationsIfFinished(exportInfo); diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index a7e957fafde..7406ee6eacd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -117,7 +117,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::YT, settings, domainPath.Base()->PathId, request.GetPeerName()); TString explain; - if (!FillItems(exportInfo, settings, explain)) { + if (!FillItems(*exportInfo, settings, explain)) { return Reply( std::move(response), Ydb::StatusIds::BAD_REQUEST, @@ -138,7 +138,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableChecksumsExport(); exportInfo->EnablePermissions = AppData()->FeatureFlags.GetEnablePermissionsExport(); TString explain; - if (!FillItems(exportInfo, settings, explain)) { + if (!FillItems(*exportInfo, settings, explain)) { return Reply( std::move(response), Ydb::StatusIds::BAD_REQUEST, @@ -159,11 +159,11 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } NIceDb::TNiceDb db(txc.DB); - Self->PersistCreateExport(db, exportInfo); + Self->PersistCreateExport(db, *exportInfo); exportInfo->State = TExportInfo::EState::CreateExportDir; exportInfo->StartTime = TAppData::TimeProvider->Now(); - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); Self->Exports[id] = exportInfo; if (uid) { @@ -218,12 +218,12 @@ private: } template <typename TSettings> - bool FillItems(TExportInfo::TPtr exportInfo, const TSettings& settings, TString& explain) { + bool FillItems(TExportInfo& exportInfo, const TSettings& settings, TString& explain) { TString commonSourcePath = GetCommonSourcePath(settings); if (commonSourcePath && commonSourcePath.back() != '/') { commonSourcePath.push_back('/'); } - exportInfo->Items.reserve(settings.items().size()); + exportInfo.Items.reserve(settings.items().size()); for (ui32 itemIdx : xrange(settings.items().size())) { const auto& item = settings.items(itemIdx); const TString srcPath = commonSourcePath + item.source_path(); @@ -243,8 +243,8 @@ private: } } - exportInfo->Items.emplace_back(srcPath, path.Base()->PathId, path->PathType); - exportInfo->PendingItems.push_back(itemIdx); + exportInfo.Items.emplace_back(srcPath, path.Base()->PathId, path->PathType); + exportInfo.PendingItems.push_back(itemIdx); } return true; @@ -330,32 +330,32 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } private: - void MkDir(TExportInfo::TPtr exportInfo, TTxId txId) { + void MkDir(const TExportInfo& exportInfo, TTxId txId) { LOG_I("TExport::TTxProgress: MkDir propose" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", txId# " << txId); - Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId); + Y_ABORT_UNLESS(exportInfo.WaitTxId == InvalidTxId); Send(Self->SelfId(), MkDirPropose(Self, txId, exportInfo)); } - void CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) { + void CopyTables(const TExportInfo& exportInfo, TTxId txId) { LOG_I("TExport::TTxProgress: CopyTables propose" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", txId# " << txId); - Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId); + Y_ABORT_UNLESS(exportInfo.WaitTxId == InvalidTxId); Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo)); } - void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - auto& item = exportInfo->Items[itemIdx]; + void TransferData(TExportInfo& exportInfo, ui32 itemIdx, TTxId txId) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + auto& item = exportInfo.Items[itemIdx]; item.SubState = ESubState::Proposed; LOG_I("TExport::TTxProgress: Backup propose" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", item# " << item.ToString(itemIdx) << ", txId# " << txId ); @@ -364,14 +364,14 @@ private: Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx)); } - void UploadScheme(TExportInfo::TPtr exportInfo, ui32 itemIdx, const TActorContext& ctx) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - auto& item = exportInfo->Items[itemIdx]; + void UploadScheme(TExportInfo& exportInfo, ui32 itemIdx, const TActorContext& ctx) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + auto& item = exportInfo.Items[itemIdx]; item.SubState = ESubState::Proposed; LOG_I("TExport::TTxProgress: UploadScheme" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", item# " << item.ToString(itemIdx) ); @@ -380,7 +380,7 @@ private: || item.SourcePathType == NKikimrSchemeOp::EPathTypePersQueueGroup) { Ydb::Export::ExportToS3Settings exportSettings; - Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings)); + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings)); const auto databaseRoot = CanonizePath(Self->RootPathElements); NBackup::TMetadata metadata; @@ -389,20 +389,20 @@ private: metadata.SetVersion(EnableChecksums ? 1 : 0); item.SchemeUploader = ctx.Register(CreateSchemeUploader( - Self->SelfId(), exportInfo->Id, itemIdx, item.SourcePathId, - exportSettings, databaseRoot, metadata.Serialize(), exportInfo->EnablePermissions + Self->SelfId(), exportInfo.Id, itemIdx, item.SourcePathId, + exportSettings, databaseRoot, metadata.Serialize(), exportInfo.EnablePermissions )); Self->RunningExportSchemeUploaders.emplace(item.SchemeUploader); } } - bool FillExportMetadata(TExportInfo::TPtr exportInfo, TString& issues) { - if (exportInfo->Kind != TExportInfo::EKind::S3) { + bool FillExportMetadata(TExportInfo& exportInfo, TString& issues) { + if (exportInfo.Kind != TExportInfo::EKind::S3) { return true; } Ydb::Export::ExportToS3Settings exportSettings; - Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings)); + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings)); if (exportSettings.destination_prefix().empty()) { // No place to save backup metadata return true; @@ -416,19 +416,19 @@ private: TMaybe<NBackup::TEncryptionIV> iv; if (exportSettings.has_encryption_settings()) { iv = NBackup::TEncryptionIV::Generate(); - exportInfo->ExportMetadata.SetIV(iv->GetBinaryString()); - exportInfo->ExportMetadata.SetEncryptionAlgorithm(NBackup::NormalizeEncryptionAlgorithmName(exportSettings.encryption_settings().encryption_algorithm())); + exportInfo.ExportMetadata.SetIV(iv->GetBinaryString()); + exportInfo.ExportMetadata.SetEncryptionAlgorithm(NBackup::NormalizeEncryptionAlgorithmName(exportSettings.encryption_settings().encryption_algorithm())); } if (!exportSettings.compression().empty()) { - exportInfo->ExportMetadata.SetCompressionAlgorithm(exportSettings.compression()); + exportInfo.ExportMetadata.SetCompressionAlgorithm(exportSettings.compression()); } const TString sourcePathRoot = exportSettings.source_path().empty() ? CanonizePath(Self->RootPathElements) : CanonizePath(exportSettings.source_path()); for (ui32 itemIndex = 1; itemIndex <= static_cast<ui32>(exportSettings.items_size()); ++itemIndex) { Ydb::Export::ExportToS3Settings::Item& exportItem = *exportSettings.mutable_items(itemIndex - 1); - NKikimrSchemeOp::TExportMetadata::TSchemaMappingItem& schemaMappingItem = *exportInfo->ExportMetadata.AddSchemaMapping(); + NKikimrSchemeOp::TExportMetadata::TSchemaMappingItem& schemaMappingItem = *exportInfo.ExportMetadata.AddSchemaMapping(); // remove source path prefix TString exportPath = CanonizePath(exportItem.source_path()); @@ -467,59 +467,59 @@ private: schemaMappingItem.SetIV(NBackup::TEncryptionIV::Combine(*iv, NBackup::EBackupFileType::Metadata, itemIndex, 0).GetBinaryString()); } } - if (!exportSettings.SerializeToString(&exportInfo->Settings)) { + if (!exportSettings.SerializeToString(&exportInfo.Settings)) { issues = "Failed to serialize settings"; return false; } return true; } - bool UploadExportMetadata(TExportInfo::TPtr exportInfo, const TActorContext& ctx) { // returns true if we need to change state to UploadExportMetadata - if (exportInfo->Kind != TExportInfo::EKind::S3) { + bool UploadExportMetadata(TExportInfo& exportInfo, const TActorContext& ctx) { // returns true if we need to change state to UploadExportMetadata + if (exportInfo.Kind != TExportInfo::EKind::S3) { return false; } Ydb::Export::ExportToS3Settings exportSettings; - Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings)); + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings)); if (exportSettings.destination_prefix().empty()) { // No place to save backup metadata return false; } - exportInfo->ExportMetadataUploader = ctx.Register( - CreateExportMetadataUploader(Self->SelfId(), exportInfo->Id, exportSettings, exportInfo->ExportMetadata)); - Self->RunningExportSchemeUploaders.emplace(exportInfo->ExportMetadataUploader); + exportInfo.ExportMetadataUploader = ctx.Register( + CreateExportMetadataUploader(Self->SelfId(), exportInfo.Id, exportSettings, exportInfo.ExportMetadata)); + Self->RunningExportSchemeUploaders.emplace(exportInfo.ExportMetadataUploader); return true; } - bool CancelTransferring(TExportInfo::TPtr exportInfo, ui32 itemIdx) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - const auto& item = exportInfo->Items.at(itemIdx); + bool CancelTransferring(TExportInfo& exportInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + const auto& item = exportInfo.Items.at(itemIdx); if (item.WaitTxId == InvalidTxId) { if (item.SubState == ESubState::Proposed) { - exportInfo->State = EState::Cancellation; + exportInfo.State = EState::Cancellation; } return false; } - exportInfo->State = EState::Cancellation; + exportInfo.State = EState::Cancellation; LOG_I("TExport::TTxProgress: cancel backup's tx" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", item# " << item.ToString(itemIdx)); - Send(Self->SelfId(), CancelPropose(exportInfo, item.WaitTxId), 0, exportInfo->Id); + Send(Self->SelfId(), CancelPropose(exportInfo, item.WaitTxId), 0, exportInfo.Id); return true; } - void DropTable(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - const auto& item = exportInfo->Items.at(itemIdx); + void DropTable(const TExportInfo& exportInfo, ui32 itemIdx, TTxId txId) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + const auto& item = exportInfo.Items.at(itemIdx); LOG_I("TExport::TTxProgress: Drop propose" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", item# " << item.ToString(itemIdx) << ", txId# " << txId); @@ -527,41 +527,41 @@ private: Send(Self->SelfId(), DropPropose(Self, txId, exportInfo, itemIdx)); } - void DropDir(TExportInfo::TPtr exportInfo, TTxId txId) { + void DropDir(const TExportInfo& exportInfo, TTxId txId) { LOG_I("TExport::TTxProgress: Drop propose" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", txId# " << txId); - Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId); + Y_ABORT_UNLESS(exportInfo.WaitTxId == InvalidTxId); Send(Self->SelfId(), DropPropose(Self, txId, exportInfo)); } - void AllocateTxId(TExportInfo::TPtr exportInfo) { + void AllocateTxId(const TExportInfo& exportInfo) { LOG_I("TExport::TTxProgress: Allocate txId" - << ": info# " << exportInfo->ToString()); + << ": info# " << exportInfo.ToString()); - Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId); - Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id); + Y_ABORT_UNLESS(exportInfo.WaitTxId == InvalidTxId); + Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo.Id); } - void AllocateTxId(TExportInfo::TPtr exportInfo, ui32 itemIdx) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - auto& item = exportInfo->Items.at(itemIdx); + void AllocateTxId(TExportInfo& exportInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + auto& item = exportInfo.Items.at(itemIdx); item.SubState = ESubState::AllocateTxId; LOG_I("TExport::TTxProgress: Allocate txId" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", item# " << item.ToString(itemIdx)); Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); - Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id); + Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo.Id); } - void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { + void PrepareAutoDropping(TSchemeShard* ss, TExportInfo& exportInfo, NIceDb::TNiceDb& db) { bool isContinued = false; PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx) { - exportInfo->PendingDropItems.push_back(itemIdx); + exportInfo.PendingDropItems.push_back(itemIdx); isContinued = true; AllocateTxId(exportInfo, itemIdx); }); @@ -574,29 +574,29 @@ private: Send(Self->SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(ui64(txId))); } - void SubscribeTx(TExportInfo::TPtr exportInfo) { + void SubscribeTx(const TExportInfo& exportInfo) { LOG_I("TExport::TTxProgress: Wait for completion" - << ": info# " << exportInfo->ToString()); + << ": info# " << exportInfo.ToString()); - Y_ABORT_UNLESS(exportInfo->WaitTxId != InvalidTxId); - SubscribeTx(exportInfo->WaitTxId); + Y_ABORT_UNLESS(exportInfo.WaitTxId != InvalidTxId); + SubscribeTx(exportInfo.WaitTxId); } - void SubscribeTx(TExportInfo::TPtr exportInfo, ui32 itemIdx) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - auto& item = exportInfo->Items.at(itemIdx); + void SubscribeTx(TExportInfo& exportInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + auto& item = exportInfo.Items.at(itemIdx); item.SubState = ESubState::Subscribed; LOG_I("TExport::TTxProgress: Wait for completion" - << ": info# " << exportInfo->ToString() + << ": info# " << exportInfo.ToString() << ", item# " << item.ToString(itemIdx)); Y_ABORT_UNLESS(item.WaitTxId != InvalidTxId); SubscribeTx(item.WaitTxId); } - static TPathId ItemPathId(TSchemeShard* ss, TExportInfo::TPtr exportInfo, ui32 itemIdx) { + static TPathId ItemPathId(TSchemeShard* ss, const TExportInfo& exportInfo, ui32 itemIdx) { const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss); if (!itemPath.IsResolved()) { @@ -606,13 +606,13 @@ private: return itemPath.Base()->PathId; } - TTxId GetActiveCopyingTxId(TExportInfo::TPtr exportInfo) { - if (exportInfo->Items.size() < 1) { + TTxId GetActiveCopyingTxId(const TExportInfo& exportInfo) { + if (exportInfo.Items.size() < 1) { return InvalidTxId; } - for (size_t i : xrange(exportInfo->Items.size())) { - const auto& item = exportInfo->Items[i]; + for (size_t i : xrange(exportInfo.Items.size())) { + const auto& item = exportInfo.Items[i]; if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) { // only tables can be targets of the copy tables operation @@ -633,9 +633,9 @@ private: return InvalidTxId; } - TTxId GetActiveBackupTxId(TExportInfo::TPtr exportInfo, ui32 itemIdx) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - const auto& item = exportInfo->Items.at(itemIdx); + TTxId GetActiveBackupTxId(const TExportInfo& exportInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + const auto& item = exportInfo.Items.at(itemIdx); Y_ABORT_UNLESS(item.State == EState::Transferring); @@ -663,36 +663,36 @@ private: } } - void Cancel(TExportInfo::TPtr exportInfo, ui32 itemIdx, TStringBuf marker) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - const auto& item = exportInfo->Items.at(itemIdx); + void Cancel(TExportInfo& exportInfo, ui32 itemIdx, TStringBuf marker) { + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); + const auto& item = exportInfo.Items.at(itemIdx); LOG_N("TExport::TTxProgress: " << marker << ", cancelling" - << ", info# " << exportInfo->ToString() + << ", info# " << exportInfo.ToString() << ", item# " << item.ToString(itemIdx)); - exportInfo->State = EState::Cancelled; + exportInfo.State = EState::Cancelled; - if (auto metadataUploader = std::exchange(exportInfo->ExportMetadataUploader, {})) { + if (auto metadataUploader = std::exchange(exportInfo.ExportMetadataUploader, {})) { Send(metadataUploader, new TEvents::TEvPoisonPill()); Self->RunningExportSchemeUploaders.erase(metadataUploader); } - for (ui32 i : xrange(exportInfo->Items.size())) { - KillChildActors(exportInfo->Items[i]); + for (ui32 i : xrange(exportInfo.Items.size())) { + KillChildActors(exportInfo.Items[i]); if (i == itemIdx) { continue; } - if (exportInfo->Items.at(i).State != EState::Transferring) { + if (exportInfo.Items.at(i).State != EState::Transferring) { continue; } CancelTransferring(exportInfo, i); } - if (exportInfo->State == EState::Cancelled) { - exportInfo->EndTime = TAppData::TimeProvider->Now(); + if (exportInfo.State == EState::Cancelled) { + exportInfo.EndTime = TAppData::TimeProvider->Now(); } } @@ -748,14 +748,14 @@ private: case EState::CreateExportDir: case EState::CopyTables: if (exportInfo->WaitTxId == InvalidTxId) { - AllocateTxId(exportInfo); + AllocateTxId(*exportInfo); } else { - SubscribeTx(exportInfo); + SubscribeTx(*exportInfo); } break; case EState::UploadExportMetadata: - Y_ABORT_UNLESS(UploadExportMetadata(exportInfo, ctx)); + Y_ABORT_UNLESS(UploadExportMetadata(*exportInfo, ctx)); break; case EState::Transferring: { @@ -767,15 +767,15 @@ private: if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable && item.State <= EState::Transferring) { pendingTables.emplace_back(itemIdx); } else { - UploadScheme(exportInfo, itemIdx, ctx); + UploadScheme(*exportInfo, itemIdx, ctx); } } else { - SubscribeTx(exportInfo, itemIdx); + SubscribeTx(*exportInfo, itemIdx); } } exportInfo->PendingItems = std::move(pendingTables); for (ui32 itemIdx : exportInfo->PendingItems) { - AllocateTxId(exportInfo, itemIdx); + AllocateTxId(*exportInfo, itemIdx); } break; } @@ -790,17 +790,17 @@ private: continue; } - if (!CancelTransferring(exportInfo, itemIdx)) { - const TTxId txId = GetActiveBackupTxId(exportInfo, itemIdx); + if (!CancelTransferring(*exportInfo, itemIdx)) { + const TTxId txId = GetActiveBackupTxId(*exportInfo, itemIdx); if (txId == InvalidTxId) { item.State = EState::Cancelled; } else { item.WaitTxId = txId; - CancelTransferring(exportInfo, itemIdx); + CancelTransferring(*exportInfo, itemIdx); } - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); } } @@ -808,7 +808,7 @@ private: exportInfo->EndTime = TAppData::TimeProvider->Now(); } - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); SendNotificationsIfFinished(exportInfo); break; @@ -824,16 +824,16 @@ private: if (item.WaitTxId == InvalidTxId) { exportInfo->PendingDropItems.push_back(itemIdx); - AllocateTxId(exportInfo, itemIdx); + AllocateTxId(*exportInfo, itemIdx); } else { - SubscribeTx(exportInfo, itemIdx); + SubscribeTx(*exportInfo, itemIdx); } } } else { if (exportInfo->WaitTxId == InvalidTxId) { - AllocateTxId(exportInfo); + AllocateTxId(*exportInfo); } else { - SubscribeTx(exportInfo); + SubscribeTx(*exportInfo); } } break; @@ -847,9 +847,9 @@ private: exportInfo->State = finalState; exportInfo->EndTime = TAppData::TimeProvider->Now(); - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); SendNotificationsIfFinished(exportInfo); - AuditLogExportEnd(*exportInfo.Get(), Self); + AuditLogExportEnd(*exportInfo, Self); } void OnAllocateResult() { @@ -873,11 +873,11 @@ private: switch (exportInfo->State) { case EState::CreateExportDir: - MkDir(exportInfo, txId); + MkDir(*exportInfo, txId); break; case EState::CopyTables: - CopyTables(exportInfo, txId); + CopyTables(*exportInfo, txId); break; case EState::Transferring: @@ -886,7 +886,7 @@ private: } itemIdx = PopFront(exportInfo->PendingItems); if (const auto type = exportInfo->Items.at(itemIdx).SourcePathType; type == NKikimrSchemeOp::EPathTypeTable) { - TransferData(exportInfo, itemIdx, txId); + TransferData(*exportInfo, itemIdx, txId); } else { LOG_W("TExport::TTxProgress: OnAllocateResult allocated a needless txId for an item transferring" << ": id# " << id @@ -901,9 +901,9 @@ private: case EState::Dropping: if (exportInfo->PendingDropItems) { itemIdx = PopFront(exportInfo->PendingDropItems); - DropTable(exportInfo, itemIdx, txId); + DropTable(*exportInfo, itemIdx, txId); } else { - DropDir(exportInfo, txId); + DropDir(*exportInfo, txId); } break; @@ -959,14 +959,14 @@ private: if (record.GetPathCreateTxId()) { txId = TTxId(record.GetPathCreateTxId()); } else { - txId = GetActiveCopyingTxId(exportInfo); + txId = GetActiveCopyingTxId(*exportInfo); } } break; case EState::Transferring: if (isMultipleMods) { - txId = GetActiveBackupTxId(exportInfo, itemIdx); + txId = GetActiveBackupTxId(*exportInfo, itemIdx); } break; @@ -985,9 +985,9 @@ private: THolder<IEventBase> ev; if (itemIdx < exportInfo->Items.size()) { - ev.Reset(DropPropose(Self, txId, exportInfo, itemIdx).Release()); + ev.Reset(DropPropose(Self, txId, *exportInfo, itemIdx).Release()); } else { - ev.Reset(DropPropose(Self, txId, exportInfo).Release()); + ev.Reset(DropPropose(Self, txId, *exportInfo).Release()); } ctx.TActivationContext::Schedule(TDuration::Seconds(10), @@ -1009,13 +1009,13 @@ private: item.State = EState::Cancelled; item.Issue = record.GetReason(); - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); if (!exportInfo->IsInProgress()) { return; } - Cancel(exportInfo, itemIdx, "unhappy propose"); + Cancel(*exportInfo, itemIdx, "unhappy propose"); } else { if (!exportInfo->IsInProgress()) { return; @@ -1048,7 +1048,7 @@ private: exportInfo->EndTime = TAppData::TimeProvider->Now(); } - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); return SendNotificationsIfFinished(exportInfo); } @@ -1058,21 +1058,21 @@ private: switch (exportInfo->State) { case EState::CreateExportDir: exportInfo->ExportPathId = Self->MakeLocalId(TLocalPathId(record.GetPathId())); - Self->PersistExportPathId(db, exportInfo); + Self->PersistExportPathId(db, *exportInfo); exportInfo->WaitTxId = txId; - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); break; case EState::CopyTables: exportInfo->WaitTxId = txId; - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); break; case EState::Transferring: Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); exportInfo->Items.at(itemIdx).WaitTxId = txId; - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); break; case EState::AutoDropping: @@ -1080,19 +1080,19 @@ private: if (!exportInfo->AllItemsAreDropped()) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); exportInfo->Items.at(itemIdx).WaitTxId = txId; - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); } else { exportInfo->WaitTxId = txId; - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); } break; case EState::Cancellation: if (itemIdx < exportInfo->Items.size()) { exportInfo->Items.at(itemIdx).WaitTxId = txId; - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); - CancelTransferring(exportInfo, itemIdx); + CancelTransferring(*exportInfo, itemIdx); } return; @@ -1145,32 +1145,32 @@ private: if (!result.Success) { item.State = EState::Cancelled; item.Issue = result.Error; - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); if (!exportInfo->IsInProgress()) { return; } - Cancel(exportInfo, itemIdx, "unsuccessful scheme upload"); + Cancel(*exportInfo, itemIdx, "unsuccessful scheme upload"); - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); return SendNotificationsIfFinished(exportInfo); } if (exportInfo->State == EState::Transferring) { item.State = EState::Done; - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { if (!AppData()->FeatureFlags.GetEnableExportAutoDropping()) { EndExport(exportInfo, EState::Done, db); } else { - PrepareAutoDropping(Self, exportInfo, db); + PrepareAutoDropping(Self, *exportInfo, db); } } } else if (exportInfo->State == EState::Cancellation) { item.State = EState::Cancelled; - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); if (AllOf(exportInfo->Items, [](const TExportInfo::TItem& item) { // on cancellation we wait only for transferring items @@ -1213,28 +1213,28 @@ private: exportInfo->EndTime = TAppData::TimeProvider->Now(); exportInfo->Issue = result.Error; - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); return SendNotificationsIfFinished(exportInfo); } Y_ABORT_UNLESS(exportInfo->State == EState::UploadExportMetadata); if (AnyOf(exportInfo->Items, &IsPathTypeTable)) { exportInfo->State = EState::CopyTables; - AllocateTxId(exportInfo); + AllocateTxId(*exportInfo); } else { // None of the items is a table. for (ui32 i : xrange(exportInfo->Items.size())) { exportInfo->Items[i].State = EState::Transferring; - Self->PersistExportItemState(db, exportInfo, i); + Self->PersistExportItemState(db, *exportInfo, i); - UploadScheme(exportInfo, i, ctx); + UploadScheme(*exportInfo, i, ctx); } exportInfo->State = EState::Transferring; exportInfo->PendingItems.clear(); } - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); } void OnNotifyResult(TTransactionContext& txc, const TActorContext& ctx) { @@ -1287,31 +1287,28 @@ private: exportInfo->WaitTxId = InvalidTxId; const bool supportEncryptedExport = AppData()->FeatureFlags.GetEnableEncryptedExport(); - if (TString issues; supportEncryptedExport && !FillExportMetadata(exportInfo, issues)) { + if (TString issues; supportEncryptedExport && !FillExportMetadata(*exportInfo, issues)) { exportInfo->State = EState::Cancelled; exportInfo->EndTime = TAppData::TimeProvider->Now(); exportInfo->Issue = issues; break; } - if (supportEncryptedExport && UploadExportMetadata(exportInfo, ctx)) { + if (supportEncryptedExport && UploadExportMetadata(*exportInfo, ctx)) { exportInfo->State = EState::UploadExportMetadata; // Persist modified metadata and new settings - db.Table<Schema::Exports>().Key(exportInfo->Id).Update( - NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo->Settings), - NIceDb::TUpdate<Schema::Exports::ExportMetadata>(exportInfo->ExportMetadata) - ); + Self->PersistExportMetadata(db, *exportInfo); } else if (AnyOf(exportInfo->Items, &IsPathTypeTable)) { exportInfo->State = EState::CopyTables; - AllocateTxId(exportInfo); + AllocateTxId(*exportInfo); } else { // None of the items is a table. for (ui32 i : xrange(exportInfo->Items.size())) { exportInfo->Items[i].State = EState::Transferring; - Self->PersistExportItemState(db, exportInfo, i); + Self->PersistExportItemState(db, *exportInfo, i); - UploadScheme(exportInfo, i, ctx); + UploadScheme(*exportInfo, i, ctx); } exportInfo->State = EState::Transferring; @@ -1324,7 +1321,7 @@ private: if (exportInfo->DependencyTxIds.contains(txId)) { exportInfo->DependencyTxIds.erase(txId); if (exportInfo->DependencyTxIds.empty()) { - AllocateTxId(exportInfo); + AllocateTxId(*exportInfo); } return; } @@ -1335,17 +1332,17 @@ private: for (ui32 itemIdx : xrange(exportInfo->Items.size())) { auto& item = exportInfo->Items[itemIdx]; item.State = EState::Transferring; - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { tables.emplace_back(itemIdx); } else { - UploadScheme(exportInfo, itemIdx, ctx); + UploadScheme(*exportInfo, itemIdx, ctx); } } exportInfo->PendingItems = std::move(tables); for (ui32 itemIdx : exportInfo->PendingItems) { - AllocateTxId(exportInfo, itemIdx); + AllocateTxId(*exportInfo, itemIdx); } break; } @@ -1359,9 +1356,9 @@ private: bool itemHasIssues = false; if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { - if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) { + if (const auto issue = GetIssues(ItemPathId(Self, *exportInfo, itemIdx), txId)) { item.Issue = *issue; - Cancel(exportInfo, itemIdx, "issues during backing up"); + Cancel(*exportInfo, itemIdx, "issues during backing up"); itemHasIssues = true; } } @@ -1370,11 +1367,11 @@ private: exportInfo->State = EState::Done; exportInfo->EndTime = TAppData::TimeProvider->Now(); } else { - PrepareAutoDropping(Self, exportInfo, db); + PrepareAutoDropping(Self, *exportInfo, db); } } - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); break; } @@ -1386,10 +1383,10 @@ private: item.State = EState::Dropped; item.WaitTxId = InvalidTxId; - Self->PersistExportItemState(db, exportInfo, itemIdx); + Self->PersistExportItemState(db, *exportInfo, itemIdx); if (exportInfo->AllItemsAreDropped()) { - AllocateTxId(exportInfo); + AllocateTxId(*exportInfo); } } else { SendNotificationsIfFinished(exportInfo, true); // for tests @@ -1403,7 +1400,7 @@ private: } Self->Exports.erase(exportInfo->Id); - Self->PersistRemoveExport(db, exportInfo); + Self->PersistRemoveExport(db, *exportInfo); } return; @@ -1411,7 +1408,7 @@ private: return SendNotificationsIfFinished(exportInfo); } - Self->PersistExportState(db, exportInfo); + Self->PersistExportState(db, *exportInfo); SendNotificationsIfFinished(exportInfo); if (exportInfo->IsFinished()) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp b/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp index d4455cc4083..f02780a26d0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp @@ -72,13 +72,13 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase { } Self->Exports.erase(exportInfo->Id); - Self->PersistRemoveExport(db, exportInfo); + Self->PersistRemoveExport(db, *exportInfo); } else { LOG_D("TExport::TTxForget, dropping export tables" << ", info: " << exportInfo->ToString() ); - - PrepareDropping(Self, exportInfo, db); + + PrepareDropping(Self, *exportInfo, db); Progress = true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index 33d3c1ff0db..cc3f59c4933 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -14,24 +14,24 @@ namespace NSchemeShard { THolder<TEvSchemeShard::TEvModifySchemeTransaction> MkDirPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo + const TExportInfo& exportInfo ) { auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID()); auto& record = propose->Record; - if (exportInfo->UserSID) { - record.SetOwner(*exportInfo->UserSID); + if (exportInfo.UserSID) { + record.SetOwner(*exportInfo.UserSID); } auto& modifyScheme = *record.AddTransaction(); modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpMkDir); modifyScheme.SetInternal(true); - const TPath domainPath = TPath::Init(exportInfo->DomainPathId, ss); + const TPath domainPath = TPath::Init(exportInfo.DomainPathId, ss); modifyScheme.SetWorkingDir(domainPath.PathString()); auto& mkDir = *modifyScheme.MutableMkDir(); - mkDir.SetName(Sprintf("export-%" PRIu64, exportInfo->Id)); + mkDir.SetName(Sprintf("export-%" PRIu64, exportInfo.Id)); return propose; } @@ -39,13 +39,13 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> MkDirPropose( THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo + const TExportInfo& exportInfo ) { auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID()); auto& record = propose->Record; - if (exportInfo->UserSID) { - record.SetOwner(*exportInfo->UserSID); + if (exportInfo.UserSID) { + record.SetOwner(*exportInfo.UserSID); } auto& modifyScheme = *record.AddTransaction(); @@ -53,13 +53,13 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose( modifyScheme.SetInternal(true); auto& copyTables = *modifyScheme.MutableCreateConsistentCopyTables()->MutableCopyTableDescriptions(); - copyTables.Reserve(exportInfo->Items.size()); + copyTables.Reserve(exportInfo.Items.size()); - const TPath exportPath = TPath::Init(exportInfo->ExportPathId, ss); + const TPath exportPath = TPath::Init(exportInfo.ExportPathId, ss); const TString& exportPathName = exportPath.PathString(); - for (ui32 itemIdx : xrange(exportInfo->Items.size())) { - const auto& item = exportInfo->Items.at(itemIdx); + for (ui32 itemIdx : xrange(exportInfo.Items.size())) { + const auto& item = exportInfo.Items.at(itemIdx); if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) { continue; } @@ -156,10 +156,10 @@ void FillPartitioning(TSchemeShard* ss, NKikimrSchemeOp::TTableDescription& desc THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo, + const TExportInfo& exportInfo, ui32 itemIdx ) { - Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); + Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size()); auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID()); @@ -167,15 +167,15 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpBackup); modifyScheme.SetInternal(true); - const TPath exportPath = TPath::Init(exportInfo->ExportPathId, ss); + const TPath exportPath = TPath::Init(exportInfo.ExportPathId, ss); const TString& exportPathName = exportPath.PathString(); modifyScheme.SetWorkingDir(exportPathName); auto& task = *modifyScheme.MutableBackup(); task.SetTableName(ToString(itemIdx)); - task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID)); + task.SetNeedToBill(!exportInfo.UserSID || !ss->SystemBackupSIDs.contains(*exportInfo.UserSID)); - const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss); + const TPath sourcePath = TPath::Init(exportInfo.Items[itemIdx].SourcePathId, ss); const TPath exportItemPath = exportPath.Child(ToString(itemIdx)); if (sourcePath.IsResolved() && exportItemPath.IsResolved()) { auto sourceDescription = GetTableDescription(ss, sourcePath.Base()->PathId); @@ -195,14 +195,14 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( task.MutableTable()->CopyFrom(sourceDescription); } - task.SetSnapshotStep(exportInfo->SnapshotStep); - task.SetSnapshotTxId(exportInfo->SnapshotTxId); + task.SetSnapshotStep(exportInfo.SnapshotStep); + task.SetSnapshotTxId(exportInfo.SnapshotTxId); - switch (exportInfo->Kind) { + switch (exportInfo.Kind) { case TExportInfo::EKind::YT: { Ydb::Export::ExportToYtSettings exportSettings; - Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings)); + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings)); task.SetNumberOfRetries(exportSettings.number_of_retries()); auto& backupSettings = *task.MutableYTSettings(); @@ -217,7 +217,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( case TExportInfo::EKind::S3: { Ydb::Export::ExportToS3Settings exportSettings; - Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings)); + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings)); task.SetNumberOfRetries(exportSettings.number_of_retries()); auto& backupSettings = *task.MutableS3Settings(); @@ -248,14 +248,14 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( Y_ABORT_UNLESS(FillCompression(*task.MutableCompression(), compression)); } - task.SetEnableChecksums(exportInfo->EnableChecksums); - task.SetEnablePermissions(exportInfo->EnablePermissions); + task.SetEnableChecksums(exportInfo.EnableChecksums); + task.SetEnablePermissions(exportInfo.EnablePermissions); if (exportSettings.has_encryption_settings()) { auto& encryptionSettings = *task.MutableEncryptionSettings(); - encryptionSettings.SetEncryptionAlgorithm(exportInfo->ExportMetadata.GetEncryptionAlgorithm()); - Y_ABORT_UNLESS(itemIdx < exportInfo->ExportMetadata.SchemaMappingSize()); - encryptionSettings.SetIV(exportInfo->ExportMetadata.GetSchemaMapping(itemIdx).GetIV()); + encryptionSettings.SetEncryptionAlgorithm(exportInfo.ExportMetadata.GetEncryptionAlgorithm()); + Y_ABORT_UNLESS(itemIdx < exportInfo.ExportMetadata.SchemaMappingSize()); + encryptionSettings.SetIV(exportInfo.ExportMetadata.GetSchemaMapping(itemIdx).GetIV()); *encryptionSettings.MutableSymmetricKey() = exportSettings.encryption_settings().symmetric_key(); } } @@ -268,7 +268,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo, + const TExportInfo& exportInfo, ui32 itemIdx ) { auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID()); @@ -277,7 +277,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose( modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable); modifyScheme.SetInternal(true); - const TPath exportPath = TPath::Init(exportInfo->ExportPathId, ss); + const TPath exportPath = TPath::Init(exportInfo.ExportPathId, ss); modifyScheme.SetWorkingDir(exportPath.PathString()); auto& drop = *modifyScheme.MutableDrop(); @@ -289,7 +289,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose( THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo + const TExportInfo& exportInfo ) { auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID()); @@ -297,30 +297,30 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose( modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpRmDir); modifyScheme.SetInternal(true); - const TPath domainPath = TPath::Init(exportInfo->DomainPathId, ss); + const TPath domainPath = TPath::Init(exportInfo.DomainPathId, ss); modifyScheme.SetWorkingDir(domainPath.PathString()); auto& drop = *modifyScheme.MutableDrop(); - drop.SetName(Sprintf("export-%" PRIu64, exportInfo->Id)); + drop.SetName(Sprintf("export-%" PRIu64, exportInfo.Id)); return propose; } THolder<TEvSchemeShard::TEvCancelTx> CancelPropose( - const TExportInfo::TPtr exportInfo, + const TExportInfo& exportInfo, TTxId backupTxId ) { auto propose = MakeHolder<TEvSchemeShard::TEvCancelTx>(); auto& record = propose->Record; - record.SetTxId(exportInfo->Id); + record.SetTxId(exportInfo.Id); record.SetTargetTxId(ui64(backupTxId)); return propose; } -TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx) { - const TPath exportPath = TPath::Init(exportInfo->ExportPathId, ss); +TString ExportItemPathName(TSchemeShard* ss, const TExportInfo& exportInfo, ui32 itemIdx) { + const TPath exportPath = TPath::Init(exportInfo.ExportPathId, ss); return ExportItemPathName(exportPath.PathString(), itemIdx); } @@ -330,26 +330,26 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) { void PrepareDropping( TSchemeShard* ss, - TExportInfo::TPtr exportInfo, + TExportInfo& exportInfo, NIceDb::TNiceDb& db, TExportInfo::EState droppingState, std::function<void(ui64)> func) { Y_ABORT_UNLESS(IsIn({TExportInfo::EState::AutoDropping, TExportInfo::EState::Dropping}, droppingState)); - exportInfo->WaitTxId = InvalidTxId; - exportInfo->State = droppingState; + exportInfo.WaitTxId = InvalidTxId; + exportInfo.State = droppingState; ss->PersistExportState(db, exportInfo); - for (ui32 itemIdx : xrange(exportInfo->Items.size())) { - auto& item = exportInfo->Items.at(itemIdx); + for (ui32 itemIdx : xrange(exportInfo.Items.size())) { + auto& item = exportInfo.Items.at(itemIdx); item.WaitTxId = InvalidTxId; item.State = TExportInfo::EState::Dropped; const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss); if (itemPath.IsResolved() && !itemPath.IsDeleted()) { item.State = TExportInfo::EState::Dropping; - if (exportInfo->State == TExportInfo::EState::AutoDropping) { + if (exportInfo.State == TExportInfo::EState::AutoDropping) { func(itemIdx); } } @@ -358,7 +358,7 @@ void PrepareDropping( } } -void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { +void PrepareDropping(TSchemeShard* ss, TExportInfo& exportInfo, NIceDb::TNiceDb& db) { PrepareDropping(ss, exportInfo, db, TExportInfo::EState::Dropping, [](ui64){}); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h index 97f9d18e33e..37938ec1252 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h @@ -10,46 +10,46 @@ namespace NSchemeShard { THolder<TEvSchemeShard::TEvModifySchemeTransaction> MkDirPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo + const TExportInfo& exportInfo ); THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo + const TExportInfo& exportInfo ); THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo, + const TExportInfo& exportInfo, ui32 itemIdx ); THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo, + const TExportInfo& exportInfo, ui32 itemIdx ); THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose( TSchemeShard* ss, TTxId txId, - const TExportInfo::TPtr exportInfo + const TExportInfo& exportInfo ); THolder<TEvSchemeShard::TEvCancelTx> CancelPropose( - const TExportInfo::TPtr exportInfo, + const TExportInfo& exportInfo, TTxId backupTxId ); -TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx); +TString ExportItemPathName(TSchemeShard* ss, const TExportInfo& exportInfo, ui32 itemIdx); TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx); -void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, +void PrepareDropping(TSchemeShard* ss, TExportInfo& exportInfo, NIceDb::TNiceDb& db, TExportInfo::EState droppingState, std::function<void(ui64)> func); -void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db); +void PrepareDropping(TSchemeShard* ss, TExportInfo& exportInfo, NIceDb::TNiceDb& db); } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index b740da24634..40f6a4dc542 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1269,11 +1269,12 @@ public: void FromXxportInfo(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo); - static void PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo); - static void PersistRemoveExport(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo); - static void PersistExportPathId(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo); - static void PersistExportState(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo); - static void PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo, ui32 targetIdx); + static void PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo); + static void PersistRemoveExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo); + static void PersistExportPathId(NIceDb::TNiceDb& db, const TExportInfo& exportInfo); + static void PersistExportState(NIceDb::TNiceDb& db, const TExportInfo& exportInfo); + static void PersistExportMetadata(NIceDb::TNiceDb& db, const TExportInfo& exportInfo); + static void PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo& exportInfo, ui32 targetIdx); struct TExport { struct TTxCreate; |
