summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <[email protected]>2025-05-19 15:19:31 +0300
committerGitHub <[email protected]>2025-05-19 15:19:31 +0300
commitd6e49c56fa759b8e47f18605e8a9d0d56761d3c7 (patch)
tree3cc9892d6f4abfa4b44bf132b208295dec19de07
parente7add4b3d61d878be3ee870907388d3b7d23b553 (diff)
Fix after PR #16190: make PersistExportMetadata method, pass ExportInfo by ref in functions (#18425)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export.cpp97
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export__create.cpp317
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export__forget.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp86
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h18
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h11
7 files changed, 274 insertions, 269 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp
index 2799057c437..28fe7a8e3d7 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp
@@ -17,23 +17,23 @@ namespace {
issue.set_message(errorMessage);
}
- void FillIssues(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo) {
- if (exportInfo->Issue) {
- AddIssue(exprt, exportInfo->Issue);
+ void FillIssues(NKikimrExport::TExport& exprt, const TExportInfo& exportInfo) {
+ if (exportInfo.Issue) {
+ AddIssue(exprt, exportInfo.Issue);
}
- for (const auto& item : exportInfo->Items) {
+ for (const auto& item : exportInfo.Items) {
if (item.Issue) {
AddIssue(exprt, item.Issue);
}
}
}
- void FillItemProgress(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx,
+ void FillItemProgress(TSchemeShard* ss, const TExportInfo& exportInfo, ui32 itemIdx,
Ydb::Export::ExportItemProgress& itemProgress) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- const auto& item = exportInfo->Items.at(itemIdx);
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ const auto& item = exportInfo.Items.at(itemIdx);
const auto opId = TOperationId(item.WaitTxId, FirstSubTxId);
if (item.WaitTxId != InvalidTxId && ss->TxInFlight.contains(opId)) {
@@ -99,7 +99,7 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn
case TExportInfo::EState::Transferring:
case TExportInfo::EState::Done:
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
- FillItemProgress(this, exportInfo, itemIdx, *exprt.AddItemsProgress());
+ FillItemProgress(this, *exportInfo, itemIdx, *exprt.AddItemsProgress());
}
exprt.SetProgress(exportInfo->IsDone()
? Ydb::Export::ExportProgress::PROGRESS_DONE
@@ -111,13 +111,13 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn
break;
case TExportInfo::EState::Cancellation:
- FillIssues(exprt, exportInfo);
+ FillIssues(exprt, *exportInfo);
exprt.SetProgress(Ydb::Export::ExportProgress::PROGRESS_CANCELLATION);
break;
case TExportInfo::EState::Cancelled:
exprt.SetStatus(Ydb::StatusIds::CANCELLED);
- FillIssues(exprt, exportInfo);
+ FillIssues(exprt, *exportInfo);
exprt.SetProgress(Ydb::Export::ExportProgress::PROGRESS_CANCELLED);
break;
@@ -141,28 +141,28 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn
}
}
-void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo) {
- db.Table<Schema::Exports>().Key(exportInfo->Id).Update(
- NIceDb::TUpdate<Schema::Exports::Uid>(exportInfo->Uid),
- NIceDb::TUpdate<Schema::Exports::Kind>(static_cast<ui8>(exportInfo->Kind)),
- NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo->Settings),
- NIceDb::TUpdate<Schema::Exports::DomainPathOwnerId>(exportInfo->DomainPathId.OwnerId),
- NIceDb::TUpdate<Schema::Exports::DomainPathId>(exportInfo->DomainPathId.LocalPathId),
- NIceDb::TUpdate<Schema::Exports::Items>(exportInfo->Items.size()),
- NIceDb::TUpdate<Schema::Exports::EnableChecksums>(exportInfo->EnableChecksums),
- NIceDb::TUpdate<Schema::Exports::EnablePermissions>(exportInfo->EnablePermissions)
+void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) {
+ db.Table<Schema::Exports>().Key(exportInfo.Id).Update(
+ NIceDb::TUpdate<Schema::Exports::Uid>(exportInfo.Uid),
+ NIceDb::TUpdate<Schema::Exports::Kind>(static_cast<ui8>(exportInfo.Kind)),
+ NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo.Settings),
+ NIceDb::TUpdate<Schema::Exports::DomainPathOwnerId>(exportInfo.DomainPathId.OwnerId),
+ NIceDb::TUpdate<Schema::Exports::DomainPathId>(exportInfo.DomainPathId.LocalPathId),
+ NIceDb::TUpdate<Schema::Exports::Items>(exportInfo.Items.size()),
+ NIceDb::TUpdate<Schema::Exports::EnableChecksums>(exportInfo.EnableChecksums),
+ NIceDb::TUpdate<Schema::Exports::EnablePermissions>(exportInfo.EnablePermissions)
);
- if (exportInfo->UserSID) {
- db.Table<Schema::Exports>().Key(exportInfo->Id).Update(
- NIceDb::TUpdate<Schema::Exports::UserSID>(*exportInfo->UserSID)
+ if (exportInfo.UserSID) {
+ db.Table<Schema::Exports>().Key(exportInfo.Id).Update(
+ NIceDb::TUpdate<Schema::Exports::UserSID>(*exportInfo.UserSID)
);
}
- for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
- const auto& item = exportInfo->Items.at(itemIdx);
+ for (ui32 itemIdx : xrange(exportInfo.Items.size())) {
+ const auto& item = exportInfo.Items.at(itemIdx);
- db.Table<Schema::ExportItems>().Key(exportInfo->Id, itemIdx).Update(
+ db.Table<Schema::ExportItems>().Key(exportInfo.Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ExportItems::SourcePathName>(item.SourcePathName),
NIceDb::TUpdate<Schema::ExportItems::SourceOwnerPathId>(item.SourcePathId.OwnerId),
NIceDb::TUpdate<Schema::ExportItems::SourcePathId>(item.SourcePathId.LocalPathId),
@@ -172,36 +172,43 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T
}
}
-void TSchemeShard::PersistRemoveExport(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo) {
- for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
- db.Table<Schema::ExportItems>().Key(exportInfo->Id, itemIdx).Delete();
+void TSchemeShard::PersistRemoveExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) {
+ for (ui32 itemIdx : xrange(exportInfo.Items.size())) {
+ db.Table<Schema::ExportItems>().Key(exportInfo.Id, itemIdx).Delete();
}
- db.Table<Schema::Exports>().Key(exportInfo->Id).Delete();
+ db.Table<Schema::Exports>().Key(exportInfo.Id).Delete();
+}
+
+void TSchemeShard::PersistExportPathId(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) {
+ db.Table<Schema::Exports>().Key(exportInfo.Id).Update(
+ NIceDb::TUpdate<Schema::Exports::ExportOwnerPathId>(exportInfo.ExportPathId.OwnerId),
+ NIceDb::TUpdate<Schema::Exports::ExportPathId>(exportInfo.ExportPathId.LocalPathId)
+ );
}
-void TSchemeShard::PersistExportPathId(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo) {
- db.Table<Schema::Exports>().Key(exportInfo->Id).Update(
- NIceDb::TUpdate<Schema::Exports::ExportOwnerPathId>(exportInfo->ExportPathId.OwnerId),
- NIceDb::TUpdate<Schema::Exports::ExportPathId>(exportInfo->ExportPathId.LocalPathId)
+void TSchemeShard::PersistExportState(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) {
+ db.Table<Schema::Exports>().Key(exportInfo.Id).Update(
+ NIceDb::TUpdate<Schema::Exports::State>(static_cast<ui8>(exportInfo.State)),
+ NIceDb::TUpdate<Schema::Exports::WaitTxId>(exportInfo.WaitTxId),
+ NIceDb::TUpdate<Schema::Exports::Issue>(exportInfo.Issue),
+ NIceDb::TUpdate<Schema::Exports::StartTime>(exportInfo.StartTime.Seconds()),
+ NIceDb::TUpdate<Schema::Exports::EndTime>(exportInfo.EndTime.Seconds())
);
}
-void TSchemeShard::PersistExportState(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo) {
- db.Table<Schema::Exports>().Key(exportInfo->Id).Update(
- NIceDb::TUpdate<Schema::Exports::State>(static_cast<ui8>(exportInfo->State)),
- NIceDb::TUpdate<Schema::Exports::WaitTxId>(exportInfo->WaitTxId),
- NIceDb::TUpdate<Schema::Exports::Issue>(exportInfo->Issue),
- NIceDb::TUpdate<Schema::Exports::StartTime>(exportInfo->StartTime.Seconds()),
- NIceDb::TUpdate<Schema::Exports::EndTime>(exportInfo->EndTime.Seconds())
+void TSchemeShard::PersistExportMetadata(NIceDb::TNiceDb& db, const TExportInfo& exportInfo) {
+ db.Table<Schema::Exports>().Key(exportInfo.Id).Update(
+ NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo.Settings),
+ NIceDb::TUpdate<Schema::Exports::ExportMetadata>(exportInfo.ExportMetadata)
);
}
-void TSchemeShard::PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo, ui32 itemIdx) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- const auto& item = exportInfo->Items.at(itemIdx);
+void TSchemeShard::PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo& exportInfo, ui32 itemIdx) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ const auto& item = exportInfo.Items.at(itemIdx);
- db.Table<Schema::ExportItems>().Key(exportInfo->Id, itemIdx).Update(
+ db.Table<Schema::ExportItems>().Key(exportInfo.Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State)),
NIceDb::TUpdate<Schema::ExportItems::BackupTxId>(item.WaitTxId),
NIceDb::TUpdate<Schema::ExportItems::Issue>(item.Issue)
diff --git a/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp
index ec7635c7cd2..4e692695437 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp
@@ -80,7 +80,7 @@ struct TSchemeShard::TExport::TTxCancel: public TSchemeShard::TXxport::TTxBase {
exportInfo->State = TExportInfo::EState::Cancellation;
if (item.WaitTxId != InvalidTxId) {
- Send(Self->SelfId(), CancelPropose(exportInfo, item.WaitTxId), 0, exportInfo->Id);
+ Send(Self->SelfId(), CancelPropose(*exportInfo, item.WaitTxId), 0, exportInfo->Id);
}
}
}
@@ -90,7 +90,7 @@ struct TSchemeShard::TExport::TTxCancel: public TSchemeShard::TXxport::TTxBase {
}
NIceDb::TNiceDb db(txc.DB);
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
Send(Request->Sender, std::move(response), 0, Request->Cookie);
SendNotificationsIfFinished(exportInfo);
@@ -169,12 +169,12 @@ struct TSchemeShard::TExport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas
Self->TxIdToExport.erase(backupTxId);
NIceDb::TNiceDb db(txc.DB);
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
if (cancelledItems == cancellableItems) {
exportInfo->State = TExportInfo::EState::Cancelled;
exportInfo->EndTime = TAppData::TimeProvider->Now();
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
}
SendNotificationsIfFinished(exportInfo);
diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
index a7e957fafde..7406ee6eacd 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
@@ -117,7 +117,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::YT, settings, domainPath.Base()->PathId, request.GetPeerName());
TString explain;
- if (!FillItems(exportInfo, settings, explain)) {
+ if (!FillItems(*exportInfo, settings, explain)) {
return Reply(
std::move(response),
Ydb::StatusIds::BAD_REQUEST,
@@ -138,7 +138,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableChecksumsExport();
exportInfo->EnablePermissions = AppData()->FeatureFlags.GetEnablePermissionsExport();
TString explain;
- if (!FillItems(exportInfo, settings, explain)) {
+ if (!FillItems(*exportInfo, settings, explain)) {
return Reply(
std::move(response),
Ydb::StatusIds::BAD_REQUEST,
@@ -159,11 +159,11 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}
NIceDb::TNiceDb db(txc.DB);
- Self->PersistCreateExport(db, exportInfo);
+ Self->PersistCreateExport(db, *exportInfo);
exportInfo->State = TExportInfo::EState::CreateExportDir;
exportInfo->StartTime = TAppData::TimeProvider->Now();
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
Self->Exports[id] = exportInfo;
if (uid) {
@@ -218,12 +218,12 @@ private:
}
template <typename TSettings>
- bool FillItems(TExportInfo::TPtr exportInfo, const TSettings& settings, TString& explain) {
+ bool FillItems(TExportInfo& exportInfo, const TSettings& settings, TString& explain) {
TString commonSourcePath = GetCommonSourcePath(settings);
if (commonSourcePath && commonSourcePath.back() != '/') {
commonSourcePath.push_back('/');
}
- exportInfo->Items.reserve(settings.items().size());
+ exportInfo.Items.reserve(settings.items().size());
for (ui32 itemIdx : xrange(settings.items().size())) {
const auto& item = settings.items(itemIdx);
const TString srcPath = commonSourcePath + item.source_path();
@@ -243,8 +243,8 @@ private:
}
}
- exportInfo->Items.emplace_back(srcPath, path.Base()->PathId, path->PathType);
- exportInfo->PendingItems.push_back(itemIdx);
+ exportInfo.Items.emplace_back(srcPath, path.Base()->PathId, path->PathType);
+ exportInfo.PendingItems.push_back(itemIdx);
}
return true;
@@ -330,32 +330,32 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
private:
- void MkDir(TExportInfo::TPtr exportInfo, TTxId txId) {
+ void MkDir(const TExportInfo& exportInfo, TTxId txId) {
LOG_I("TExport::TTxProgress: MkDir propose"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", txId# " << txId);
- Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId);
+ Y_ABORT_UNLESS(exportInfo.WaitTxId == InvalidTxId);
Send(Self->SelfId(), MkDirPropose(Self, txId, exportInfo));
}
- void CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) {
+ void CopyTables(const TExportInfo& exportInfo, TTxId txId) {
LOG_I("TExport::TTxProgress: CopyTables propose"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", txId# " << txId);
- Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId);
+ Y_ABORT_UNLESS(exportInfo.WaitTxId == InvalidTxId);
Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo));
}
- void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- auto& item = exportInfo->Items[itemIdx];
+ void TransferData(TExportInfo& exportInfo, ui32 itemIdx, TTxId txId) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ auto& item = exportInfo.Items[itemIdx];
item.SubState = ESubState::Proposed;
LOG_I("TExport::TTxProgress: Backup propose"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", item# " << item.ToString(itemIdx)
<< ", txId# " << txId
);
@@ -364,14 +364,14 @@ private:
Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx));
}
- void UploadScheme(TExportInfo::TPtr exportInfo, ui32 itemIdx, const TActorContext& ctx) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- auto& item = exportInfo->Items[itemIdx];
+ void UploadScheme(TExportInfo& exportInfo, ui32 itemIdx, const TActorContext& ctx) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ auto& item = exportInfo.Items[itemIdx];
item.SubState = ESubState::Proposed;
LOG_I("TExport::TTxProgress: UploadScheme"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", item# " << item.ToString(itemIdx)
);
@@ -380,7 +380,7 @@ private:
|| item.SourcePathType == NKikimrSchemeOp::EPathTypePersQueueGroup)
{
Ydb::Export::ExportToS3Settings exportSettings;
- Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings));
+ Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings));
const auto databaseRoot = CanonizePath(Self->RootPathElements);
NBackup::TMetadata metadata;
@@ -389,20 +389,20 @@ private:
metadata.SetVersion(EnableChecksums ? 1 : 0);
item.SchemeUploader = ctx.Register(CreateSchemeUploader(
- Self->SelfId(), exportInfo->Id, itemIdx, item.SourcePathId,
- exportSettings, databaseRoot, metadata.Serialize(), exportInfo->EnablePermissions
+ Self->SelfId(), exportInfo.Id, itemIdx, item.SourcePathId,
+ exportSettings, databaseRoot, metadata.Serialize(), exportInfo.EnablePermissions
));
Self->RunningExportSchemeUploaders.emplace(item.SchemeUploader);
}
}
- bool FillExportMetadata(TExportInfo::TPtr exportInfo, TString& issues) {
- if (exportInfo->Kind != TExportInfo::EKind::S3) {
+ bool FillExportMetadata(TExportInfo& exportInfo, TString& issues) {
+ if (exportInfo.Kind != TExportInfo::EKind::S3) {
return true;
}
Ydb::Export::ExportToS3Settings exportSettings;
- Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings));
+ Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings));
if (exportSettings.destination_prefix().empty()) { // No place to save backup metadata
return true;
@@ -416,19 +416,19 @@ private:
TMaybe<NBackup::TEncryptionIV> iv;
if (exportSettings.has_encryption_settings()) {
iv = NBackup::TEncryptionIV::Generate();
- exportInfo->ExportMetadata.SetIV(iv->GetBinaryString());
- exportInfo->ExportMetadata.SetEncryptionAlgorithm(NBackup::NormalizeEncryptionAlgorithmName(exportSettings.encryption_settings().encryption_algorithm()));
+ exportInfo.ExportMetadata.SetIV(iv->GetBinaryString());
+ exportInfo.ExportMetadata.SetEncryptionAlgorithm(NBackup::NormalizeEncryptionAlgorithmName(exportSettings.encryption_settings().encryption_algorithm()));
}
if (!exportSettings.compression().empty()) {
- exportInfo->ExportMetadata.SetCompressionAlgorithm(exportSettings.compression());
+ exportInfo.ExportMetadata.SetCompressionAlgorithm(exportSettings.compression());
}
const TString sourcePathRoot = exportSettings.source_path().empty() ? CanonizePath(Self->RootPathElements) : CanonizePath(exportSettings.source_path());
for (ui32 itemIndex = 1; itemIndex <= static_cast<ui32>(exportSettings.items_size()); ++itemIndex) {
Ydb::Export::ExportToS3Settings::Item& exportItem = *exportSettings.mutable_items(itemIndex - 1);
- NKikimrSchemeOp::TExportMetadata::TSchemaMappingItem& schemaMappingItem = *exportInfo->ExportMetadata.AddSchemaMapping();
+ NKikimrSchemeOp::TExportMetadata::TSchemaMappingItem& schemaMappingItem = *exportInfo.ExportMetadata.AddSchemaMapping();
// remove source path prefix
TString exportPath = CanonizePath(exportItem.source_path());
@@ -467,59 +467,59 @@ private:
schemaMappingItem.SetIV(NBackup::TEncryptionIV::Combine(*iv, NBackup::EBackupFileType::Metadata, itemIndex, 0).GetBinaryString());
}
}
- if (!exportSettings.SerializeToString(&exportInfo->Settings)) {
+ if (!exportSettings.SerializeToString(&exportInfo.Settings)) {
issues = "Failed to serialize settings";
return false;
}
return true;
}
- bool UploadExportMetadata(TExportInfo::TPtr exportInfo, const TActorContext& ctx) { // returns true if we need to change state to UploadExportMetadata
- if (exportInfo->Kind != TExportInfo::EKind::S3) {
+ bool UploadExportMetadata(TExportInfo& exportInfo, const TActorContext& ctx) { // returns true if we need to change state to UploadExportMetadata
+ if (exportInfo.Kind != TExportInfo::EKind::S3) {
return false;
}
Ydb::Export::ExportToS3Settings exportSettings;
- Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings));
+ Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings));
if (exportSettings.destination_prefix().empty()) { // No place to save backup metadata
return false;
}
- exportInfo->ExportMetadataUploader = ctx.Register(
- CreateExportMetadataUploader(Self->SelfId(), exportInfo->Id, exportSettings, exportInfo->ExportMetadata));
- Self->RunningExportSchemeUploaders.emplace(exportInfo->ExportMetadataUploader);
+ exportInfo.ExportMetadataUploader = ctx.Register(
+ CreateExportMetadataUploader(Self->SelfId(), exportInfo.Id, exportSettings, exportInfo.ExportMetadata));
+ Self->RunningExportSchemeUploaders.emplace(exportInfo.ExportMetadataUploader);
return true;
}
- bool CancelTransferring(TExportInfo::TPtr exportInfo, ui32 itemIdx) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- const auto& item = exportInfo->Items.at(itemIdx);
+ bool CancelTransferring(TExportInfo& exportInfo, ui32 itemIdx) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ const auto& item = exportInfo.Items.at(itemIdx);
if (item.WaitTxId == InvalidTxId) {
if (item.SubState == ESubState::Proposed) {
- exportInfo->State = EState::Cancellation;
+ exportInfo.State = EState::Cancellation;
}
return false;
}
- exportInfo->State = EState::Cancellation;
+ exportInfo.State = EState::Cancellation;
LOG_I("TExport::TTxProgress: cancel backup's tx"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", item# " << item.ToString(itemIdx));
- Send(Self->SelfId(), CancelPropose(exportInfo, item.WaitTxId), 0, exportInfo->Id);
+ Send(Self->SelfId(), CancelPropose(exportInfo, item.WaitTxId), 0, exportInfo.Id);
return true;
}
- void DropTable(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- const auto& item = exportInfo->Items.at(itemIdx);
+ void DropTable(const TExportInfo& exportInfo, ui32 itemIdx, TTxId txId) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ const auto& item = exportInfo.Items.at(itemIdx);
LOG_I("TExport::TTxProgress: Drop propose"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", item# " << item.ToString(itemIdx)
<< ", txId# " << txId);
@@ -527,41 +527,41 @@ private:
Send(Self->SelfId(), DropPropose(Self, txId, exportInfo, itemIdx));
}
- void DropDir(TExportInfo::TPtr exportInfo, TTxId txId) {
+ void DropDir(const TExportInfo& exportInfo, TTxId txId) {
LOG_I("TExport::TTxProgress: Drop propose"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", txId# " << txId);
- Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId);
+ Y_ABORT_UNLESS(exportInfo.WaitTxId == InvalidTxId);
Send(Self->SelfId(), DropPropose(Self, txId, exportInfo));
}
- void AllocateTxId(TExportInfo::TPtr exportInfo) {
+ void AllocateTxId(const TExportInfo& exportInfo) {
LOG_I("TExport::TTxProgress: Allocate txId"
- << ": info# " << exportInfo->ToString());
+ << ": info# " << exportInfo.ToString());
- Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId);
- Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id);
+ Y_ABORT_UNLESS(exportInfo.WaitTxId == InvalidTxId);
+ Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo.Id);
}
- void AllocateTxId(TExportInfo::TPtr exportInfo, ui32 itemIdx) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- auto& item = exportInfo->Items.at(itemIdx);
+ void AllocateTxId(TExportInfo& exportInfo, ui32 itemIdx) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ auto& item = exportInfo.Items.at(itemIdx);
item.SubState = ESubState::AllocateTxId;
LOG_I("TExport::TTxProgress: Allocate txId"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", item# " << item.ToString(itemIdx));
Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
- Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id);
+ Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo.Id);
}
- void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) {
+ void PrepareAutoDropping(TSchemeShard* ss, TExportInfo& exportInfo, NIceDb::TNiceDb& db) {
bool isContinued = false;
PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx) {
- exportInfo->PendingDropItems.push_back(itemIdx);
+ exportInfo.PendingDropItems.push_back(itemIdx);
isContinued = true;
AllocateTxId(exportInfo, itemIdx);
});
@@ -574,29 +574,29 @@ private:
Send(Self->SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(ui64(txId)));
}
- void SubscribeTx(TExportInfo::TPtr exportInfo) {
+ void SubscribeTx(const TExportInfo& exportInfo) {
LOG_I("TExport::TTxProgress: Wait for completion"
- << ": info# " << exportInfo->ToString());
+ << ": info# " << exportInfo.ToString());
- Y_ABORT_UNLESS(exportInfo->WaitTxId != InvalidTxId);
- SubscribeTx(exportInfo->WaitTxId);
+ Y_ABORT_UNLESS(exportInfo.WaitTxId != InvalidTxId);
+ SubscribeTx(exportInfo.WaitTxId);
}
- void SubscribeTx(TExportInfo::TPtr exportInfo, ui32 itemIdx) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- auto& item = exportInfo->Items.at(itemIdx);
+ void SubscribeTx(TExportInfo& exportInfo, ui32 itemIdx) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ auto& item = exportInfo.Items.at(itemIdx);
item.SubState = ESubState::Subscribed;
LOG_I("TExport::TTxProgress: Wait for completion"
- << ": info# " << exportInfo->ToString()
+ << ": info# " << exportInfo.ToString()
<< ", item# " << item.ToString(itemIdx));
Y_ABORT_UNLESS(item.WaitTxId != InvalidTxId);
SubscribeTx(item.WaitTxId);
}
- static TPathId ItemPathId(TSchemeShard* ss, TExportInfo::TPtr exportInfo, ui32 itemIdx) {
+ static TPathId ItemPathId(TSchemeShard* ss, const TExportInfo& exportInfo, ui32 itemIdx) {
const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss);
if (!itemPath.IsResolved()) {
@@ -606,13 +606,13 @@ private:
return itemPath.Base()->PathId;
}
- TTxId GetActiveCopyingTxId(TExportInfo::TPtr exportInfo) {
- if (exportInfo->Items.size() < 1) {
+ TTxId GetActiveCopyingTxId(const TExportInfo& exportInfo) {
+ if (exportInfo.Items.size() < 1) {
return InvalidTxId;
}
- for (size_t i : xrange(exportInfo->Items.size())) {
- const auto& item = exportInfo->Items[i];
+ for (size_t i : xrange(exportInfo.Items.size())) {
+ const auto& item = exportInfo.Items[i];
if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) {
// only tables can be targets of the copy tables operation
@@ -633,9 +633,9 @@ private:
return InvalidTxId;
}
- TTxId GetActiveBackupTxId(TExportInfo::TPtr exportInfo, ui32 itemIdx) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- const auto& item = exportInfo->Items.at(itemIdx);
+ TTxId GetActiveBackupTxId(const TExportInfo& exportInfo, ui32 itemIdx) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ const auto& item = exportInfo.Items.at(itemIdx);
Y_ABORT_UNLESS(item.State == EState::Transferring);
@@ -663,36 +663,36 @@ private:
}
}
- void Cancel(TExportInfo::TPtr exportInfo, ui32 itemIdx, TStringBuf marker) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
- const auto& item = exportInfo->Items.at(itemIdx);
+ void Cancel(TExportInfo& exportInfo, ui32 itemIdx, TStringBuf marker) {
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
+ const auto& item = exportInfo.Items.at(itemIdx);
LOG_N("TExport::TTxProgress: " << marker << ", cancelling"
- << ", info# " << exportInfo->ToString()
+ << ", info# " << exportInfo.ToString()
<< ", item# " << item.ToString(itemIdx));
- exportInfo->State = EState::Cancelled;
+ exportInfo.State = EState::Cancelled;
- if (auto metadataUploader = std::exchange(exportInfo->ExportMetadataUploader, {})) {
+ if (auto metadataUploader = std::exchange(exportInfo.ExportMetadataUploader, {})) {
Send(metadataUploader, new TEvents::TEvPoisonPill());
Self->RunningExportSchemeUploaders.erase(metadataUploader);
}
- for (ui32 i : xrange(exportInfo->Items.size())) {
- KillChildActors(exportInfo->Items[i]);
+ for (ui32 i : xrange(exportInfo.Items.size())) {
+ KillChildActors(exportInfo.Items[i]);
if (i == itemIdx) {
continue;
}
- if (exportInfo->Items.at(i).State != EState::Transferring) {
+ if (exportInfo.Items.at(i).State != EState::Transferring) {
continue;
}
CancelTransferring(exportInfo, i);
}
- if (exportInfo->State == EState::Cancelled) {
- exportInfo->EndTime = TAppData::TimeProvider->Now();
+ if (exportInfo.State == EState::Cancelled) {
+ exportInfo.EndTime = TAppData::TimeProvider->Now();
}
}
@@ -748,14 +748,14 @@ private:
case EState::CreateExportDir:
case EState::CopyTables:
if (exportInfo->WaitTxId == InvalidTxId) {
- AllocateTxId(exportInfo);
+ AllocateTxId(*exportInfo);
} else {
- SubscribeTx(exportInfo);
+ SubscribeTx(*exportInfo);
}
break;
case EState::UploadExportMetadata:
- Y_ABORT_UNLESS(UploadExportMetadata(exportInfo, ctx));
+ Y_ABORT_UNLESS(UploadExportMetadata(*exportInfo, ctx));
break;
case EState::Transferring: {
@@ -767,15 +767,15 @@ private:
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable && item.State <= EState::Transferring) {
pendingTables.emplace_back(itemIdx);
} else {
- UploadScheme(exportInfo, itemIdx, ctx);
+ UploadScheme(*exportInfo, itemIdx, ctx);
}
} else {
- SubscribeTx(exportInfo, itemIdx);
+ SubscribeTx(*exportInfo, itemIdx);
}
}
exportInfo->PendingItems = std::move(pendingTables);
for (ui32 itemIdx : exportInfo->PendingItems) {
- AllocateTxId(exportInfo, itemIdx);
+ AllocateTxId(*exportInfo, itemIdx);
}
break;
}
@@ -790,17 +790,17 @@ private:
continue;
}
- if (!CancelTransferring(exportInfo, itemIdx)) {
- const TTxId txId = GetActiveBackupTxId(exportInfo, itemIdx);
+ if (!CancelTransferring(*exportInfo, itemIdx)) {
+ const TTxId txId = GetActiveBackupTxId(*exportInfo, itemIdx);
if (txId == InvalidTxId) {
item.State = EState::Cancelled;
} else {
item.WaitTxId = txId;
- CancelTransferring(exportInfo, itemIdx);
+ CancelTransferring(*exportInfo, itemIdx);
}
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
}
}
@@ -808,7 +808,7 @@ private:
exportInfo->EndTime = TAppData::TimeProvider->Now();
}
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
SendNotificationsIfFinished(exportInfo);
break;
@@ -824,16 +824,16 @@ private:
if (item.WaitTxId == InvalidTxId) {
exportInfo->PendingDropItems.push_back(itemIdx);
- AllocateTxId(exportInfo, itemIdx);
+ AllocateTxId(*exportInfo, itemIdx);
} else {
- SubscribeTx(exportInfo, itemIdx);
+ SubscribeTx(*exportInfo, itemIdx);
}
}
} else {
if (exportInfo->WaitTxId == InvalidTxId) {
- AllocateTxId(exportInfo);
+ AllocateTxId(*exportInfo);
} else {
- SubscribeTx(exportInfo);
+ SubscribeTx(*exportInfo);
}
}
break;
@@ -847,9 +847,9 @@ private:
exportInfo->State = finalState;
exportInfo->EndTime = TAppData::TimeProvider->Now();
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
SendNotificationsIfFinished(exportInfo);
- AuditLogExportEnd(*exportInfo.Get(), Self);
+ AuditLogExportEnd(*exportInfo, Self);
}
void OnAllocateResult() {
@@ -873,11 +873,11 @@ private:
switch (exportInfo->State) {
case EState::CreateExportDir:
- MkDir(exportInfo, txId);
+ MkDir(*exportInfo, txId);
break;
case EState::CopyTables:
- CopyTables(exportInfo, txId);
+ CopyTables(*exportInfo, txId);
break;
case EState::Transferring:
@@ -886,7 +886,7 @@ private:
}
itemIdx = PopFront(exportInfo->PendingItems);
if (const auto type = exportInfo->Items.at(itemIdx).SourcePathType; type == NKikimrSchemeOp::EPathTypeTable) {
- TransferData(exportInfo, itemIdx, txId);
+ TransferData(*exportInfo, itemIdx, txId);
} else {
LOG_W("TExport::TTxProgress: OnAllocateResult allocated a needless txId for an item transferring"
<< ": id# " << id
@@ -901,9 +901,9 @@ private:
case EState::Dropping:
if (exportInfo->PendingDropItems) {
itemIdx = PopFront(exportInfo->PendingDropItems);
- DropTable(exportInfo, itemIdx, txId);
+ DropTable(*exportInfo, itemIdx, txId);
} else {
- DropDir(exportInfo, txId);
+ DropDir(*exportInfo, txId);
}
break;
@@ -959,14 +959,14 @@ private:
if (record.GetPathCreateTxId()) {
txId = TTxId(record.GetPathCreateTxId());
} else {
- txId = GetActiveCopyingTxId(exportInfo);
+ txId = GetActiveCopyingTxId(*exportInfo);
}
}
break;
case EState::Transferring:
if (isMultipleMods) {
- txId = GetActiveBackupTxId(exportInfo, itemIdx);
+ txId = GetActiveBackupTxId(*exportInfo, itemIdx);
}
break;
@@ -985,9 +985,9 @@ private:
THolder<IEventBase> ev;
if (itemIdx < exportInfo->Items.size()) {
- ev.Reset(DropPropose(Self, txId, exportInfo, itemIdx).Release());
+ ev.Reset(DropPropose(Self, txId, *exportInfo, itemIdx).Release());
} else {
- ev.Reset(DropPropose(Self, txId, exportInfo).Release());
+ ev.Reset(DropPropose(Self, txId, *exportInfo).Release());
}
ctx.TActivationContext::Schedule(TDuration::Seconds(10),
@@ -1009,13 +1009,13 @@ private:
item.State = EState::Cancelled;
item.Issue = record.GetReason();
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
if (!exportInfo->IsInProgress()) {
return;
}
- Cancel(exportInfo, itemIdx, "unhappy propose");
+ Cancel(*exportInfo, itemIdx, "unhappy propose");
} else {
if (!exportInfo->IsInProgress()) {
return;
@@ -1048,7 +1048,7 @@ private:
exportInfo->EndTime = TAppData::TimeProvider->Now();
}
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
return SendNotificationsIfFinished(exportInfo);
}
@@ -1058,21 +1058,21 @@ private:
switch (exportInfo->State) {
case EState::CreateExportDir:
exportInfo->ExportPathId = Self->MakeLocalId(TLocalPathId(record.GetPathId()));
- Self->PersistExportPathId(db, exportInfo);
+ Self->PersistExportPathId(db, *exportInfo);
exportInfo->WaitTxId = txId;
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
break;
case EState::CopyTables:
exportInfo->WaitTxId = txId;
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
break;
case EState::Transferring:
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
exportInfo->Items.at(itemIdx).WaitTxId = txId;
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
break;
case EState::AutoDropping:
@@ -1080,19 +1080,19 @@ private:
if (!exportInfo->AllItemsAreDropped()) {
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
exportInfo->Items.at(itemIdx).WaitTxId = txId;
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
} else {
exportInfo->WaitTxId = txId;
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
}
break;
case EState::Cancellation:
if (itemIdx < exportInfo->Items.size()) {
exportInfo->Items.at(itemIdx).WaitTxId = txId;
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
- CancelTransferring(exportInfo, itemIdx);
+ CancelTransferring(*exportInfo, itemIdx);
}
return;
@@ -1145,32 +1145,32 @@ private:
if (!result.Success) {
item.State = EState::Cancelled;
item.Issue = result.Error;
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
if (!exportInfo->IsInProgress()) {
return;
}
- Cancel(exportInfo, itemIdx, "unsuccessful scheme upload");
+ Cancel(*exportInfo, itemIdx, "unsuccessful scheme upload");
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
return SendNotificationsIfFinished(exportInfo);
}
if (exportInfo->State == EState::Transferring) {
item.State = EState::Done;
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
if (!AppData()->FeatureFlags.GetEnableExportAutoDropping()) {
EndExport(exportInfo, EState::Done, db);
} else {
- PrepareAutoDropping(Self, exportInfo, db);
+ PrepareAutoDropping(Self, *exportInfo, db);
}
}
} else if (exportInfo->State == EState::Cancellation) {
item.State = EState::Cancelled;
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
if (AllOf(exportInfo->Items, [](const TExportInfo::TItem& item) {
// on cancellation we wait only for transferring items
@@ -1213,28 +1213,28 @@ private:
exportInfo->EndTime = TAppData::TimeProvider->Now();
exportInfo->Issue = result.Error;
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
return SendNotificationsIfFinished(exportInfo);
}
Y_ABORT_UNLESS(exportInfo->State == EState::UploadExportMetadata);
if (AnyOf(exportInfo->Items, &IsPathTypeTable)) {
exportInfo->State = EState::CopyTables;
- AllocateTxId(exportInfo);
+ AllocateTxId(*exportInfo);
} else {
// None of the items is a table.
for (ui32 i : xrange(exportInfo->Items.size())) {
exportInfo->Items[i].State = EState::Transferring;
- Self->PersistExportItemState(db, exportInfo, i);
+ Self->PersistExportItemState(db, *exportInfo, i);
- UploadScheme(exportInfo, i, ctx);
+ UploadScheme(*exportInfo, i, ctx);
}
exportInfo->State = EState::Transferring;
exportInfo->PendingItems.clear();
}
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
}
void OnNotifyResult(TTransactionContext& txc, const TActorContext& ctx) {
@@ -1287,31 +1287,28 @@ private:
exportInfo->WaitTxId = InvalidTxId;
const bool supportEncryptedExport = AppData()->FeatureFlags.GetEnableEncryptedExport();
- if (TString issues; supportEncryptedExport && !FillExportMetadata(exportInfo, issues)) {
+ if (TString issues; supportEncryptedExport && !FillExportMetadata(*exportInfo, issues)) {
exportInfo->State = EState::Cancelled;
exportInfo->EndTime = TAppData::TimeProvider->Now();
exportInfo->Issue = issues;
break;
}
- if (supportEncryptedExport && UploadExportMetadata(exportInfo, ctx)) {
+ if (supportEncryptedExport && UploadExportMetadata(*exportInfo, ctx)) {
exportInfo->State = EState::UploadExportMetadata;
// Persist modified metadata and new settings
- db.Table<Schema::Exports>().Key(exportInfo->Id).Update(
- NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo->Settings),
- NIceDb::TUpdate<Schema::Exports::ExportMetadata>(exportInfo->ExportMetadata)
- );
+ Self->PersistExportMetadata(db, *exportInfo);
} else if (AnyOf(exportInfo->Items, &IsPathTypeTable)) {
exportInfo->State = EState::CopyTables;
- AllocateTxId(exportInfo);
+ AllocateTxId(*exportInfo);
} else {
// None of the items is a table.
for (ui32 i : xrange(exportInfo->Items.size())) {
exportInfo->Items[i].State = EState::Transferring;
- Self->PersistExportItemState(db, exportInfo, i);
+ Self->PersistExportItemState(db, *exportInfo, i);
- UploadScheme(exportInfo, i, ctx);
+ UploadScheme(*exportInfo, i, ctx);
}
exportInfo->State = EState::Transferring;
@@ -1324,7 +1321,7 @@ private:
if (exportInfo->DependencyTxIds.contains(txId)) {
exportInfo->DependencyTxIds.erase(txId);
if (exportInfo->DependencyTxIds.empty()) {
- AllocateTxId(exportInfo);
+ AllocateTxId(*exportInfo);
}
return;
}
@@ -1335,17 +1332,17 @@ private:
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
auto& item = exportInfo->Items[itemIdx];
item.State = EState::Transferring;
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
tables.emplace_back(itemIdx);
} else {
- UploadScheme(exportInfo, itemIdx, ctx);
+ UploadScheme(*exportInfo, itemIdx, ctx);
}
}
exportInfo->PendingItems = std::move(tables);
for (ui32 itemIdx : exportInfo->PendingItems) {
- AllocateTxId(exportInfo, itemIdx);
+ AllocateTxId(*exportInfo, itemIdx);
}
break;
}
@@ -1359,9 +1356,9 @@ private:
bool itemHasIssues = false;
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
- if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) {
+ if (const auto issue = GetIssues(ItemPathId(Self, *exportInfo, itemIdx), txId)) {
item.Issue = *issue;
- Cancel(exportInfo, itemIdx, "issues during backing up");
+ Cancel(*exportInfo, itemIdx, "issues during backing up");
itemHasIssues = true;
}
}
@@ -1370,11 +1367,11 @@ private:
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
} else {
- PrepareAutoDropping(Self, exportInfo, db);
+ PrepareAutoDropping(Self, *exportInfo, db);
}
}
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
break;
}
@@ -1386,10 +1383,10 @@ private:
item.State = EState::Dropped;
item.WaitTxId = InvalidTxId;
- Self->PersistExportItemState(db, exportInfo, itemIdx);
+ Self->PersistExportItemState(db, *exportInfo, itemIdx);
if (exportInfo->AllItemsAreDropped()) {
- AllocateTxId(exportInfo);
+ AllocateTxId(*exportInfo);
}
} else {
SendNotificationsIfFinished(exportInfo, true); // for tests
@@ -1403,7 +1400,7 @@ private:
}
Self->Exports.erase(exportInfo->Id);
- Self->PersistRemoveExport(db, exportInfo);
+ Self->PersistRemoveExport(db, *exportInfo);
}
return;
@@ -1411,7 +1408,7 @@ private:
return SendNotificationsIfFinished(exportInfo);
}
- Self->PersistExportState(db, exportInfo);
+ Self->PersistExportState(db, *exportInfo);
SendNotificationsIfFinished(exportInfo);
if (exportInfo->IsFinished()) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp b/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp
index d4455cc4083..f02780a26d0 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp
@@ -72,13 +72,13 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase {
}
Self->Exports.erase(exportInfo->Id);
- Self->PersistRemoveExport(db, exportInfo);
+ Self->PersistRemoveExport(db, *exportInfo);
} else {
LOG_D("TExport::TTxForget, dropping export tables"
<< ", info: " << exportInfo->ToString()
);
-
- PrepareDropping(Self, exportInfo, db);
+
+ PrepareDropping(Self, *exportInfo, db);
Progress = true;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
index 33d3c1ff0db..cc3f59c4933 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
@@ -14,24 +14,24 @@ namespace NSchemeShard {
THolder<TEvSchemeShard::TEvModifySchemeTransaction> MkDirPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo
+ const TExportInfo& exportInfo
) {
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;
- if (exportInfo->UserSID) {
- record.SetOwner(*exportInfo->UserSID);
+ if (exportInfo.UserSID) {
+ record.SetOwner(*exportInfo.UserSID);
}
auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpMkDir);
modifyScheme.SetInternal(true);
- const TPath domainPath = TPath::Init(exportInfo->DomainPathId, ss);
+ const TPath domainPath = TPath::Init(exportInfo.DomainPathId, ss);
modifyScheme.SetWorkingDir(domainPath.PathString());
auto& mkDir = *modifyScheme.MutableMkDir();
- mkDir.SetName(Sprintf("export-%" PRIu64, exportInfo->Id));
+ mkDir.SetName(Sprintf("export-%" PRIu64, exportInfo.Id));
return propose;
}
@@ -39,13 +39,13 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> MkDirPropose(
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo
+ const TExportInfo& exportInfo
) {
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;
- if (exportInfo->UserSID) {
- record.SetOwner(*exportInfo->UserSID);
+ if (exportInfo.UserSID) {
+ record.SetOwner(*exportInfo.UserSID);
}
auto& modifyScheme = *record.AddTransaction();
@@ -53,13 +53,13 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose(
modifyScheme.SetInternal(true);
auto& copyTables = *modifyScheme.MutableCreateConsistentCopyTables()->MutableCopyTableDescriptions();
- copyTables.Reserve(exportInfo->Items.size());
+ copyTables.Reserve(exportInfo.Items.size());
- const TPath exportPath = TPath::Init(exportInfo->ExportPathId, ss);
+ const TPath exportPath = TPath::Init(exportInfo.ExportPathId, ss);
const TString& exportPathName = exportPath.PathString();
- for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
- const auto& item = exportInfo->Items.at(itemIdx);
+ for (ui32 itemIdx : xrange(exportInfo.Items.size())) {
+ const auto& item = exportInfo.Items.at(itemIdx);
if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) {
continue;
}
@@ -156,10 +156,10 @@ void FillPartitioning(TSchemeShard* ss, NKikimrSchemeOp::TTableDescription& desc
THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo,
+ const TExportInfo& exportInfo,
ui32 itemIdx
) {
- Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
+ Y_ABORT_UNLESS(itemIdx < exportInfo.Items.size());
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
@@ -167,15 +167,15 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpBackup);
modifyScheme.SetInternal(true);
- const TPath exportPath = TPath::Init(exportInfo->ExportPathId, ss);
+ const TPath exportPath = TPath::Init(exportInfo.ExportPathId, ss);
const TString& exportPathName = exportPath.PathString();
modifyScheme.SetWorkingDir(exportPathName);
auto& task = *modifyScheme.MutableBackup();
task.SetTableName(ToString(itemIdx));
- task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID));
+ task.SetNeedToBill(!exportInfo.UserSID || !ss->SystemBackupSIDs.contains(*exportInfo.UserSID));
- const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss);
+ const TPath sourcePath = TPath::Init(exportInfo.Items[itemIdx].SourcePathId, ss);
const TPath exportItemPath = exportPath.Child(ToString(itemIdx));
if (sourcePath.IsResolved() && exportItemPath.IsResolved()) {
auto sourceDescription = GetTableDescription(ss, sourcePath.Base()->PathId);
@@ -195,14 +195,14 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
task.MutableTable()->CopyFrom(sourceDescription);
}
- task.SetSnapshotStep(exportInfo->SnapshotStep);
- task.SetSnapshotTxId(exportInfo->SnapshotTxId);
+ task.SetSnapshotStep(exportInfo.SnapshotStep);
+ task.SetSnapshotTxId(exportInfo.SnapshotTxId);
- switch (exportInfo->Kind) {
+ switch (exportInfo.Kind) {
case TExportInfo::EKind::YT:
{
Ydb::Export::ExportToYtSettings exportSettings;
- Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings));
+ Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings));
task.SetNumberOfRetries(exportSettings.number_of_retries());
auto& backupSettings = *task.MutableYTSettings();
@@ -217,7 +217,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
case TExportInfo::EKind::S3:
{
Ydb::Export::ExportToS3Settings exportSettings;
- Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings));
+ Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings));
task.SetNumberOfRetries(exportSettings.number_of_retries());
auto& backupSettings = *task.MutableS3Settings();
@@ -248,14 +248,14 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
Y_ABORT_UNLESS(FillCompression(*task.MutableCompression(), compression));
}
- task.SetEnableChecksums(exportInfo->EnableChecksums);
- task.SetEnablePermissions(exportInfo->EnablePermissions);
+ task.SetEnableChecksums(exportInfo.EnableChecksums);
+ task.SetEnablePermissions(exportInfo.EnablePermissions);
if (exportSettings.has_encryption_settings()) {
auto& encryptionSettings = *task.MutableEncryptionSettings();
- encryptionSettings.SetEncryptionAlgorithm(exportInfo->ExportMetadata.GetEncryptionAlgorithm());
- Y_ABORT_UNLESS(itemIdx < exportInfo->ExportMetadata.SchemaMappingSize());
- encryptionSettings.SetIV(exportInfo->ExportMetadata.GetSchemaMapping(itemIdx).GetIV());
+ encryptionSettings.SetEncryptionAlgorithm(exportInfo.ExportMetadata.GetEncryptionAlgorithm());
+ Y_ABORT_UNLESS(itemIdx < exportInfo.ExportMetadata.SchemaMappingSize());
+ encryptionSettings.SetIV(exportInfo.ExportMetadata.GetSchemaMapping(itemIdx).GetIV());
*encryptionSettings.MutableSymmetricKey() = exportSettings.encryption_settings().symmetric_key();
}
}
@@ -268,7 +268,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo,
+ const TExportInfo& exportInfo,
ui32 itemIdx
) {
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
@@ -277,7 +277,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose(
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable);
modifyScheme.SetInternal(true);
- const TPath exportPath = TPath::Init(exportInfo->ExportPathId, ss);
+ const TPath exportPath = TPath::Init(exportInfo.ExportPathId, ss);
modifyScheme.SetWorkingDir(exportPath.PathString());
auto& drop = *modifyScheme.MutableDrop();
@@ -289,7 +289,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose(
THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo
+ const TExportInfo& exportInfo
) {
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
@@ -297,30 +297,30 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose(
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpRmDir);
modifyScheme.SetInternal(true);
- const TPath domainPath = TPath::Init(exportInfo->DomainPathId, ss);
+ const TPath domainPath = TPath::Init(exportInfo.DomainPathId, ss);
modifyScheme.SetWorkingDir(domainPath.PathString());
auto& drop = *modifyScheme.MutableDrop();
- drop.SetName(Sprintf("export-%" PRIu64, exportInfo->Id));
+ drop.SetName(Sprintf("export-%" PRIu64, exportInfo.Id));
return propose;
}
THolder<TEvSchemeShard::TEvCancelTx> CancelPropose(
- const TExportInfo::TPtr exportInfo,
+ const TExportInfo& exportInfo,
TTxId backupTxId
) {
auto propose = MakeHolder<TEvSchemeShard::TEvCancelTx>();
auto& record = propose->Record;
- record.SetTxId(exportInfo->Id);
+ record.SetTxId(exportInfo.Id);
record.SetTargetTxId(ui64(backupTxId));
return propose;
}
-TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx) {
- const TPath exportPath = TPath::Init(exportInfo->ExportPathId, ss);
+TString ExportItemPathName(TSchemeShard* ss, const TExportInfo& exportInfo, ui32 itemIdx) {
+ const TPath exportPath = TPath::Init(exportInfo.ExportPathId, ss);
return ExportItemPathName(exportPath.PathString(), itemIdx);
}
@@ -330,26 +330,26 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) {
void PrepareDropping(
TSchemeShard* ss,
- TExportInfo::TPtr exportInfo,
+ TExportInfo& exportInfo,
NIceDb::TNiceDb& db,
TExportInfo::EState droppingState,
std::function<void(ui64)> func)
{
Y_ABORT_UNLESS(IsIn({TExportInfo::EState::AutoDropping, TExportInfo::EState::Dropping}, droppingState));
- exportInfo->WaitTxId = InvalidTxId;
- exportInfo->State = droppingState;
+ exportInfo.WaitTxId = InvalidTxId;
+ exportInfo.State = droppingState;
ss->PersistExportState(db, exportInfo);
- for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
- auto& item = exportInfo->Items.at(itemIdx);
+ for (ui32 itemIdx : xrange(exportInfo.Items.size())) {
+ auto& item = exportInfo.Items.at(itemIdx);
item.WaitTxId = InvalidTxId;
item.State = TExportInfo::EState::Dropped;
const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss);
if (itemPath.IsResolved() && !itemPath.IsDeleted()) {
item.State = TExportInfo::EState::Dropping;
- if (exportInfo->State == TExportInfo::EState::AutoDropping) {
+ if (exportInfo.State == TExportInfo::EState::AutoDropping) {
func(itemIdx);
}
}
@@ -358,7 +358,7 @@ void PrepareDropping(
}
}
-void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) {
+void PrepareDropping(TSchemeShard* ss, TExportInfo& exportInfo, NIceDb::TNiceDb& db) {
PrepareDropping(ss, exportInfo, db, TExportInfo::EState::Dropping, [](ui64){});
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h
index 97f9d18e33e..37938ec1252 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h
+++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h
@@ -10,46 +10,46 @@ namespace NSchemeShard {
THolder<TEvSchemeShard::TEvModifySchemeTransaction> MkDirPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo
+ const TExportInfo& exportInfo
);
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo
+ const TExportInfo& exportInfo
);
THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo,
+ const TExportInfo& exportInfo,
ui32 itemIdx
);
THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo,
+ const TExportInfo& exportInfo,
ui32 itemIdx
);
THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropPropose(
TSchemeShard* ss,
TTxId txId,
- const TExportInfo::TPtr exportInfo
+ const TExportInfo& exportInfo
);
THolder<TEvSchemeShard::TEvCancelTx> CancelPropose(
- const TExportInfo::TPtr exportInfo,
+ const TExportInfo& exportInfo,
TTxId backupTxId
);
-TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx);
+TString ExportItemPathName(TSchemeShard* ss, const TExportInfo& exportInfo, ui32 itemIdx);
TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx);
-void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db,
+void PrepareDropping(TSchemeShard* ss, TExportInfo& exportInfo, NIceDb::TNiceDb& db,
TExportInfo::EState droppingState, std::function<void(ui64)> func);
-void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db);
+void PrepareDropping(TSchemeShard* ss, TExportInfo& exportInfo, NIceDb::TNiceDb& db);
} // NSchemeShard
} // NKikimr
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index b740da24634..40f6a4dc542 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -1269,11 +1269,12 @@ public:
void FromXxportInfo(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo);
- static void PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo);
- static void PersistRemoveExport(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo);
- static void PersistExportPathId(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo);
- static void PersistExportState(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo);
- static void PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo::TPtr exportInfo, ui32 targetIdx);
+ static void PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo);
+ static void PersistRemoveExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo);
+ static void PersistExportPathId(NIceDb::TNiceDb& db, const TExportInfo& exportInfo);
+ static void PersistExportState(NIceDb::TNiceDb& db, const TExportInfo& exportInfo);
+ static void PersistExportMetadata(NIceDb::TNiceDb& db, const TExportInfo& exportInfo);
+ static void PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo& exportInfo, ui32 targetIdx);
struct TExport {
struct TTxCreate;