diff options
author | Daniil Demin <deminds@ydb.tech> | 2025-02-14 23:56:24 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-14 23:56:24 +0300 |
commit | 952ca0f30d296247f95ab781b0a5aa5b485ac2c7 (patch) | |
tree | 2bdf96b8e50d25c92281cdfd468024b93778fb64 | |
parent | 92e49daeb599088679ff4865f6da76c3cef1985c (diff) | |
download | ydb-952ca0f30d296247f95ab781b0a5aa5b485ac2c7.tar.gz |
External data sources: restore (#14586)
-rw-r--r-- | ydb/apps/ydb/CHANGELOG.md | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/restore_impl.cpp | 177 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/restore_impl.h | 13 |
3 files changed, 176 insertions, 15 deletions
diff --git a/ydb/apps/ydb/CHANGELOG.md b/ydb/apps/ydb/CHANGELOG.md index bc683fd2a6..02a24b7207 100644 --- a/ydb/apps/ydb/CHANGELOG.md +++ b/ydb/apps/ydb/CHANGELOG.md @@ -1,3 +1,4 @@ +* Include external data sources and external tables in local backups (`ydb tools dump` and `ydb tools restore`). Both scheme objects are backed up as YQL creation queries saved in the `create_external_data_source.sql` and `create_external_table.sql` files respectively, which can be executed to recreate the original scheme objects. * Fixed a bug where `ydb auth get-token` command tried to authenticate twice: while listing andpoints and while executing actual token request. * Fixed a bug where `ydb import file csv` command was saving progress even if a batch upload had been failed. * Include coordination nodes in local backups (`ydb tools dump` and `ydb tools restore`). Rate limiters that utilize the coordination node are saved in the coordination node's backup folder, preserving the existing path hierarchy. diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp index e7bcde21bb..41a0a2577f 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp @@ -69,6 +69,14 @@ TString ReadAsyncReplicationQuery(const TFsPath& fsDirPath, const TLog* log) { return ReadFromFile(fsDirPath, log, NFiles::CreateAsyncReplication()); } +TString ReadExternalDataSourceQuery(const TFsPath& fsDirPath, const TLog* log) { + return ReadFromFile(fsDirPath, log, NFiles::CreateExternalDataSource()); +} + +TString ReadExternalTableQuery(const TFsPath& fsDirPath, const TLog* log) { + return ReadFromFile(fsDirPath, log, NFiles::CreateExternalTable()); +} + template <typename TProtoType> TProtoType ReadProtoFromFile(const TFsPath& fsDirPath, const TLog* log, const NFiles::TFileInfo& fileInfo) { TProtoType proto; @@ -340,6 +348,16 @@ TRestoreResult TRestoreClient::RetryViewRestoration() { return result; } +TRestoreResult TRestoreClient::RestoreExternalTables() { + for (const auto& [fsPath, dbPath, settings, isAlreadyExisting] : ExternalTableRestorationCalls) { + auto result = RestoreExternalTable(fsPath, dbPath, settings, isAlreadyExisting); + if (!result.IsSuccess()) { + return result; + } + } + return Result<TRestoreResult>(); +} + TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings) { LOG_I("Restore " << fsPath.Quote() << " to " << dbPath.Quote()); @@ -376,8 +394,11 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP // restore auto restoreResult = RestoreFolder(fsPath, dbPath, "", settings, oldEntries); - if (auto retryViewResult = RetryViewRestoration(); !retryViewResult.IsSuccess()) { - restoreResult = retryViewResult; + if (auto result = RetryViewRestoration(); !result.IsSuccess()) { + restoreResult = result; + } + if (auto result = RestoreExternalTables(); !result.IsSuccess()) { + restoreResult = result; } if (restoreResult.IsSuccess()) { @@ -399,6 +420,8 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP return restoreResult; } + TVector<const TSchemeEntry*> entriesToDropInSecondPass; + for (const auto& entry : newDirectoryList.Entries) { if (oldEntries.contains(entry.Name)) { continue; @@ -419,7 +442,7 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP break; case ESchemeEntryType::View: result = QueryClient.RetryQuerySync([&path = fullPath](NQuery::TSession session) { - return session.ExecuteQuery(std::format("DROP VIEW IF EXISTS `{}`;", path), + return session.ExecuteQuery(std::format("DROP VIEW `{}`;", path), NQuery::TTxControl::NoTx()).ExtractValueSync(); }); break; @@ -433,6 +456,15 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP return client.DropNode(path).ExtractValueSync(); }); break; + case ESchemeEntryType::ExternalDataSource: + entriesToDropInSecondPass.emplace_back(&entry); + continue; + case ESchemeEntryType::ExternalTable: + result = QueryClient.RetryQuerySync([&path = fullPath](NQuery::TSession session) { + return session.ExecuteQuery(std::format("DROP EXTERNAL TABLE `{}`;", path), + NQuery::TTxControl::NoTx()).ExtractValueSync(); + }); + break; default: break; } @@ -442,7 +474,27 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP return restoreResult; } else if (!result->IsSuccess()) { LOG_E("Error removing " << entry.Type << ": " << TString{fullPath}.Quote() - << ": " << result->GetIssues().ToOneLineString()); + << ", issues: " << result->GetIssues().ToOneLineString()); + return restoreResult; + } + } + + for (const auto* entry : entriesToDropInSecondPass) { + TMaybe<TStatus> result; + switch (entry->Type) { + case ESchemeEntryType::ExternalDataSource: + result = QueryClient.RetryQuerySync([&path = entry->Name](NQuery::TSession session) { + return session.ExecuteQuery(std::format("DROP EXTERNAL DATA SOURCE `{}`;", path), + NQuery::TTxControl::NoTx()).ExtractValueSync(); + }); + break; + default: + break; + } + Y_ENSURE(result, "Unexpected entry to drop in the second pass"); + if (!result->IsSuccess()) { + LOG_E("Error removing " << entry->Type << ": " << TString{entry->Name}.Quote() + << ", issues: " << result->GetIssues().ToOneLineString()); return restoreResult; } } @@ -476,7 +528,7 @@ TRestoreResult TRestoreClient::RestoreClusterRoot(const TFsPath& fsPath) { } LOG_I("Restore cluster root " << ClusterRootPath.Quote() << " from " << fsPath.GetPath().Quote()); - + if (!fsPath.Exists()) { return Result<TRestoreResult>(EStatus::BAD_REQUEST, TStringBuilder() << "Specified folder does not exist: " << fsPath.GetPath()); @@ -508,7 +560,7 @@ TRestoreResult TRestoreClient::RestoreClusterRoot(const TFsPath& fsPath) { if (auto result = RestoreGroupMembers(rootTableClient, fsPath, ClusterRootPath); !result.IsSuccess()) { return result; } - + if (auto result = RestorePermissionsImpl(rootSchemeClient, fsPath, ClusterRootPath); !result.IsSuccess()) { return result; } @@ -521,7 +573,7 @@ TRestoreResult TRestoreClient::WaitForAvailableNodes(const TString& database, TD dbDriverConfig.SetDatabase(database); THPTimer timer; - + NDiscovery::TDiscoveryClient client(dbDriverConfig); TDuration retrySleep = TDuration::MilliSeconds(1000); while (true) { @@ -561,13 +613,13 @@ TRestoreResult TRestoreClient::RestoreUsers(TTableClient& client, const TFsPath& auto statementResult = client.RetryOperationSync([&](TSession session) { return session.ExecuteSchemeQuery(statement).ExtractValueSync(); }); - + if (statement.StartsWith("CREATE") && statementResult.GetStatus() == EStatus::PRECONDITION_FAILED && statementResult.GetIssues().ToOneLineString().find("exists") != TString::npos) { LOG_D("User from create statement " << statement.Quote() << " already exists, trying to alter it"); - auto alterStatement = "ALTER" + statement.substr(6); + auto alterStatement = "ALTER" + statement.substr(6); auto alterStatementResult = client.RetryOperationSync([&](TSession session) { return session.ExecuteSchemeQuery(alterStatement).ExtractValueSync(); }); @@ -658,7 +710,7 @@ TRestoreResult TRestoreClient::ReplaceClusterRoot(TString& outPath) { if (clusterRootEnd != std::string::npos) { outPath = ClusterRootPath + outPath.substr(clusterRootEnd); } else { - return Result<TRestoreResult>(EStatus::INTERNAL_ERROR, + return Result<TRestoreResult>(EStatus::INTERNAL_ERROR, TStringBuilder() << "Can't find cluster root path in " << outPath.Quote() << " to replace it on " << ClusterRootPath.Quote()); @@ -686,7 +738,7 @@ TRestoreResult TRestoreClient::RestoreDatabaseImpl(const TString& fsPath, const } LOG_I("Restore database from " << fsPath.Quote() << " to " << dbPath.Quote()); - + if (auto result = CreateDatabase(CmsClient, dbPath, TCreateDatabaseSettings(dbDesc)); !result.IsSuccess()) { if (result.GetStatus() == EStatus::ALREADY_EXISTS) { LOG_W("Database " << dbPath.Quote() << " already exists, continue restoring to this database"); @@ -723,8 +775,11 @@ TRestoreResult TRestoreClient::RestoreDatabaseImpl(const TString& fsPath, const if (settings.WithContent_) { auto restoreResult = RestoreFolder(fsPath, dbPath, "", {}, {}); - if (auto retryViewResult = RetryViewRestoration(); !retryViewResult.IsSuccess()) { - restoreResult = retryViewResult; + if (auto result = RetryViewRestoration(); !result.IsSuccess()) { + restoreResult = result; + } + if (auto result = RestoreExternalTables(); !result.IsSuccess()) { + restoreResult = result; } return restoreResult; } else { @@ -761,6 +816,7 @@ TRestoreResult TRestoreClient::RestoreDatabases(const TFsPath& fsPath, const TRe if (IsFileExists(fsPath.Child(NFiles::Database().FileName))) { TRestoreDatabaseSettings dbSettings = { .WaitNodesDuration_ = settings.WaitNodesDuration_, + .Database_ = std::nullopt, .WithContent_ = false }; @@ -866,6 +922,16 @@ TRestoreResult TRestoreClient::RestoreFolder( return RestoreReplication(fsPath, dbRestoreRoot, dbPathRelativeToRestoreRoot, settings, oldEntries.contains(objectDbPath)); } + if (IsFileExists(fsPath.Child(NFiles::CreateExternalDataSource().FileName))) { + return RestoreExternalDataSource(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath)); + } + + if (IsFileExists(fsPath.Child(NFiles::CreateExternalTable().FileName))) { + // delay external table restoration + ExternalTableRestorationCalls.emplace_back(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath)); + return Result<TRestoreResult>(); + } + if (IsFileExists(fsPath.Child(NFiles::Empty().FileName))) { return RestoreEmptyDir(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath)); } @@ -889,6 +955,11 @@ TRestoreResult TRestoreClient::RestoreFolder( result = RestoreCoordinationNode(child, childDbPath, settings, oldEntries.contains(childDbPath)); } else if (IsFileExists(child.Child(NFiles::CreateAsyncReplication().FileName))) { result = RestoreReplication(child, dbRestoreRoot, Join('/', dbPathRelativeToRestoreRoot, child.GetName()), settings, oldEntries.contains(childDbPath)); + } else if (IsFileExists(child.Child(NFiles::CreateExternalDataSource().FileName))) { + result = RestoreExternalDataSource(child, childDbPath, settings, oldEntries.contains(childDbPath)); + } else if (IsFileExists(child.Child(NFiles::CreateExternalTable().FileName))) { + // delay external table restoration + ExternalTableRestorationCalls.emplace_back(child, childDbPath, settings, oldEntries.contains(childDbPath)); } else if (child.IsDirectory()) { result = RestoreFolder(child, dbRestoreRoot, Join('/', dbPathRelativeToRestoreRoot, child.GetName()), settings, oldEntries); } @@ -900,7 +971,7 @@ TRestoreResult TRestoreClient::RestoreFolder( const bool dbPathExists = oldEntries.contains(dbPath); if (!result.Defined() && !dbPathExists) { - // This situation occurs when all the children of the folder are views. + // This situation occurs when all the children of the folder are views or external tables. return RestoreEmptyDir(fsPath, dbPath, settings, dbPathExists); } @@ -1118,6 +1189,82 @@ TRestoreResult TRestoreClient::RestoreCoordinationNode( return Result<TRestoreResult>(dbPath, std::move(result)); } +TRestoreResult TRestoreClient::RestoreExternalDataSource( + const TFsPath& fsPath, + const TString& dbPath, + const TRestoreSettings& settings, + bool isAlreadyExisting) +{ + LOG_D("Process " << fsPath.GetPath().Quote()); + + if (auto error = ErrorOnIncomplete(fsPath)) { + return *error; + } + + LOG_I("Restore external data source " << fsPath.GetPath().Quote() << " to " << dbPath.Quote()); + + if (settings.DryRun_) { + return CheckExistenceAndType(SchemeClient, dbPath, ESchemeEntryType::ExternalDataSource); + } + + TString query = ReadExternalDataSourceQuery(fsPath, Log.get()); + + NYql::TIssues issues; + if (!RewriteCreateQuery(query, "CREATE EXTERNAL DATA SOURCE IF NOT EXISTS `{}`", dbPath, issues)) { + return Result<TRestoreResult>(fsPath.GetPath(), EStatus::BAD_REQUEST, issues.ToString()); + } + + auto result = QueryClient.RetryQuerySync([&](NQuery::TSession session) { + return session.ExecuteQuery(query, NQuery::TTxControl::NoTx()).ExtractValueSync(); + }); + + if (result.IsSuccess()) { + LOG_D("Created " << dbPath.Quote()); + return RestorePermissions(fsPath, dbPath, settings, isAlreadyExisting); + } + + LOG_E("Failed to create " << dbPath.Quote()); + return Result<TRestoreResult>(dbPath, std::move(result)); +} + +TRestoreResult TRestoreClient::RestoreExternalTable( + const TFsPath& fsPath, + const TString& dbPath, + const TRestoreSettings& settings, + bool isAlreadyExisting) +{ + LOG_D("Process " << fsPath.GetPath().Quote()); + + if (auto error = ErrorOnIncomplete(fsPath)) { + return *error; + } + + LOG_I("Restore external table " << fsPath.GetPath().Quote() << " to " << dbPath.Quote()); + + if (settings.DryRun_) { + return CheckExistenceAndType(SchemeClient, dbPath, ESchemeEntryType::ExternalTable); + } + + TString query = ReadExternalTableQuery(fsPath, Log.get()); + + NYql::TIssues issues; + if (!RewriteCreateQuery(query, "CREATE EXTERNAL TABLE IF NOT EXISTS `{}`", dbPath, issues)) { + return Result<TRestoreResult>(fsPath.GetPath(), EStatus::BAD_REQUEST, issues.ToString()); + } + + auto result = QueryClient.RetryQuerySync([&](NQuery::TSession session) { + return session.ExecuteQuery(query, NQuery::TTxControl::NoTx()).ExtractValueSync(); + }); + + if (result.IsSuccess()) { + LOG_D("Created " << dbPath.Quote()); + return RestorePermissions(fsPath, dbPath, settings, isAlreadyExisting); + } + + LOG_E("Failed to create " << dbPath.Quote()); + return Result<TRestoreResult>(dbPath, std::move(result)); +} + TRestoreResult TRestoreClient::RestoreTable( const TFsPath& fsPath, const TString& dbPath, @@ -1544,7 +1691,7 @@ TRestoreResult TRestoreClient::RestorePermissionsImpl( if (result.GetStatus() == EStatus::UNAUTHORIZED) { LOG_W("Not enough rights to restore permissions on " << dbPath.Quote() << ", skipping"); return Result<TRestoreResult>(); - } + } return result; } diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.h b/ydb/public/lib/ydb_cli/dump/restore_impl.h index 2a6dce5303..315ba9204b 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.h +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.h @@ -135,6 +135,8 @@ class TRestoreClient { TRestoreResult RestoreCoordinationNode(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting); TRestoreResult RestoreDependentResources(const TFsPath& fsPath, const TString& dbPath); TRestoreResult RestoreRateLimiter(const TFsPath& fsPath, const TString& coordinationNodePath, const TString& resourcePath); + TRestoreResult RestoreExternalDataSource(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting); + TRestoreResult RestoreExternalTable(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting); TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc); TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc, ui32 partitionCount); @@ -147,6 +149,7 @@ class TRestoreClient { TRestoreResult ReplaceClusterRoot(TString& outPath); TRestoreResult WaitForAvailableNodes(const TString& database, TDuration waitDuration); TRestoreResult RetryViewRestoration(); + TRestoreResult RestoreExternalTables(); TRestoreResult RestoreClusterRoot(const TFsPath& fsPath); TRestoreResult RestoreDatabases(const TFsPath& fsPath, const TRestoreClusterSettings& settings); @@ -196,6 +199,16 @@ private: // If the dependency is not created yet, then the view restoration will fail. // We retry failed view creation attempts until either all views are created, or the errors are persistent. TVector<TRestoreViewCall> ViewRestorationCalls; + + struct TRestoreExternalTableCall { + TFsPath FsPath; + TString DbPath; + TRestoreSettings Settings; + bool IsAlreadyExisting; + }; + // External Tables depend on External Data Sources and need to be restored after them. + TVector<TRestoreExternalTableCall> ExternalTableRestorationCalls; + TString ClusterRootPath; }; // TRestoreClient |