diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2025-02-11 17:37:44 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-11 17:37:44 +0300 |
commit | 5a8b1e9af4d74067ff21e038ae61ac0520efa45f (patch) | |
tree | 0427608f003350b7f3bce110942a71a10dcd4846 | |
parent | 7a4c2f7e40790497b5140c5930d4959774b2670c (diff) | |
download | ydb-5a8b1e9af4d74067ff21e038ae61ac0520efa45f.tar.gz |
Backup async replication (#14441)
-rw-r--r-- | ydb/library/backup/backup.cpp | 110 | ||||
-rw-r--r-- | ydb/library/backup/db_iterator.h | 18 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/files/files.cpp | 6 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/files/files.h | 1 |
4 files changed, 134 insertions, 1 deletions
diff --git a/ydb/library/backup/backup.cpp b/ydb/library/backup/backup.cpp index 7024072ac2..4d313baa0a 100644 --- a/ydb/library/backup/backup.cpp +++ b/ydb/library/backup/backup.cpp @@ -3,6 +3,7 @@ #include "util.h" #include <ydb-cpp-sdk/client/cms/cms.h> +#include <ydb-cpp-sdk/client/draft/ydb_replication.h> #include <ydb-cpp-sdk/client/draft/ydb_view.h> #include <ydb-cpp-sdk/client/driver/driver.h> #include <ydb-cpp-sdk/client/proto/accessor.h> @@ -10,6 +11,7 @@ #include <ydb-cpp-sdk/client/table/table.h> #include <ydb-cpp-sdk/client/topic/client.h> #include <ydb-cpp-sdk/client/value/value.h> +#include <ydb/public/api/protos/draft/ydb_replication.pb.h> #include <ydb/public/api/protos/draft/ydb_view.pb.h> #include <ydb/public/api/protos/ydb_cms.pb.h> #include <ydb/public/api/protos/ydb_rate_limiter.pb.h> @@ -52,6 +54,7 @@ #include <google/protobuf/text_format.h> +#include <format> namespace NYdb::NBackup { @@ -683,6 +686,110 @@ void BackupCoordinationNode(TDriver driver, const TString& dbPath, const TFsPath BackupPermissions(driver, dbPath, fsBackupFolder); } +namespace { + +NReplication::TReplicationDescription DescribeReplication(TDriver driver, const TString& path) { + NReplication::TReplicationClient client(driver); + auto status = NConsoleClient::RetryFunction([&]() { + return client.DescribeReplication(path).ExtractValueSync(); + }); + VerifyStatus(status, "describe async replication"); + return status.GetReplicationDescription(); +} + +TString BuildConnectionString(const NReplication::TConnectionParams& params) { + return TStringBuilder() + << (params.GetEnableSsl() ? "grpcs://" : "grpc://") + << params.GetDiscoveryEndpoint() + << "/?database=" << params.GetDatabase(); +} + +inline TString BuildTarget(const char* src, const char* dst) { + return TStringBuilder() << " `" << src << "` AS `" << dst << "`"; +} + +inline TString Quote(const char* value) { + return TStringBuilder() << "'" << value << "'"; +} + +template <typename StringType> +inline TString Quote(const StringType& value) { + return Quote(value.c_str()); +} + +inline TString BuildOption(const char* key, const TString& value) { + return TStringBuilder() << " " << key << " = " << value << ""; +} + +inline TString Interval(const TDuration& value) { + return TStringBuilder() << "Interval('PT" << value.Seconds() << "S')"; +} + +TString BuildCreateReplicationQuery( + const TString& name, + const TString& dbPath, + const NReplication::TReplicationDescription& desc, + const TString& backupRoot, + NYql::TIssues& issues) +{ + // TODO(ilnaz) + Y_UNUSED(dbPath); + Y_UNUSED(backupRoot); + Y_UNUSED(issues); + + TVector<TString> targets(::Reserve(desc.GetItems().size())); + for (const auto& item : desc.GetItems()) { + if (!item.DstPath.ends_with("/indexImplTable")) { // TODO(ilnaz): get rid of this hack + targets.push_back(BuildTarget(item.SrcPath.c_str(), item.DstPath.c_str())); + } + } + + const auto& params = desc.GetConnectionParams(); + + TVector<TString> opts(::Reserve(5 /* max options */)); + opts.push_back(BuildOption("CONNECTION_STRING", Quote(BuildConnectionString(params)))); + switch (params.GetCredentials()) { + case NReplication::TConnectionParams::ECredentials::Static: + opts.push_back(BuildOption("USER", Quote(params.GetStaticCredentials().User))); + opts.push_back(BuildOption("PASSWORD_SECRET_NAME", Quote(params.GetStaticCredentials().PasswordSecretName))); + break; + case NReplication::TConnectionParams::ECredentials::OAuth: + opts.push_back(BuildOption("TOKEN_SECRET_NAME", Quote(params.GetOAuthCredentials().TokenSecretName))); + break; + } + + opts.push_back(BuildOption("CONSISTENCY_LEVEL", Quote(ToString(desc.GetConsistencyLevel())))); + if (desc.GetConsistencyLevel() == NReplication::TReplicationDescription::EConsistencyLevel::Global) { + opts.push_back(BuildOption("COMMIT_INTERVAL", Interval(desc.GetGlobalConsistency().GetCommitInterval()))); + } + + return std::format("CREATE ASYNC REPLICATION `{}`\nFOR\n{}\nWITH (\n{}\n);", + name.c_str(), JoinSeq(",\n", targets).c_str(), JoinSeq(",\n", opts).c_str()); +} + +} + +void BackupReplication( + TDriver driver, + const TString& dbBackupRoot, + const TString& dbPathRelativeToBackupRoot, + const TFsPath& fsBackupFolder, + NYql::TIssues& issues) +{ + Y_ENSURE(!dbPathRelativeToBackupRoot.empty()); + const auto dbPath = JoinDatabasePath(dbBackupRoot, dbPathRelativeToBackupRoot); + + LOG_I("Backup async replication " << dbPath.Quote() << " to " << fsBackupFolder.GetPath().Quote()); + + const auto name = TFsPath(dbPathRelativeToBackupRoot).GetName(); + const auto desc = DescribeReplication(driver, dbPath); + const auto creationQuery = BuildCreateReplicationQuery(name, dbPath, desc, dbBackupRoot, issues); + Y_ENSURE(creationQuery, issues.ToString()); + + WriteCreationQueryToFile(creationQuery, fsBackupFolder, NDump::NFiles::CreateAsyncReplication()); + BackupPermissions(driver, dbPath, fsBackupFolder); +} + void CreateClusterDirectory(const TDriver& driver, const TString& path, bool rootBackupDir = false) { if (rootBackupDir) { LOG_I("Create temporary directory " << path.Quote() << " in database"); @@ -776,6 +883,9 @@ void BackupFolderImpl(TDriver driver, const TString& dbPrefix, const TString& ba if (dbIt.IsCoordinationNode()) { BackupCoordinationNode(driver, dbIt.GetFullPath(), childFolderPath); } + if (dbIt.IsReplication()) { + BackupReplication(driver, dbIt.GetTraverseRoot(), dbIt.GetRelPath(), childFolderPath, issues); + } dbIt.Next(); } } diff --git a/ydb/library/backup/db_iterator.h b/ydb/library/backup/db_iterator.h index 388f9ef405..1f8bec8973 100644 --- a/ydb/library/backup/db_iterator.h +++ b/ydb/library/backup/db_iterator.h @@ -40,6 +40,18 @@ private: TString TraverseRoot; TDeque<TSchemeEntryWithPath> NextNodes; + static const TVector<NScheme::ESchemeEntryType>& SupportedEntryTypes() { + static const TVector<NScheme::ESchemeEntryType> values = { + NScheme::ESchemeEntryType::Table, + NScheme::ESchemeEntryType::View, + NScheme::ESchemeEntryType::Topic, + NScheme::ESchemeEntryType::CoordinationNode, + NScheme::ESchemeEntryType::Replication, + }; + + return values; + } + public: TDbIterator(TDriver driver, const TString& fullPath) : Client(driver) @@ -48,7 +60,7 @@ public: Y_ENSURE(listResult.IsSuccess(), "Can't list directory, maybe it doesn't exist, dbPath# " << fullPath.Quote()); - if (IsIn({NScheme::ESchemeEntryType::Table, NScheme::ESchemeEntryType::View}, listResult.GetEntry().Type)) { + if (IsIn(SupportedEntryTypes(), listResult.GetEntry().Type)) { TPathSplitUnix parentPath(fullPath); parentPath.pop_back(); TraverseRoot = parentPath.Reconstruct(); @@ -145,6 +157,10 @@ public: return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Directory; } + bool IsReplication() const { + return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Replication; + } + bool IsListed() const { return NextNodes.front().IsListed; } diff --git a/ydb/public/lib/ydb_cli/dump/files/files.cpp b/ydb/public/lib/ydb_cli/dump/files/files.cpp index 496909c623..885cf4f8ce 100644 --- a/ydb/public/lib/ydb_cli/dump/files/files.cpp +++ b/ydb/public/lib/ydb_cli/dump/files/files.cpp @@ -18,6 +18,7 @@ enum EFilesType { CREATE_USER, CREATE_GROUP, ALTER_GROUP, + CREATE_ASYNC_REPLICATION, }; static constexpr TFileInfo FILES_INFO[] = { @@ -36,6 +37,7 @@ static constexpr TFileInfo FILES_INFO[] = { {"create_user.sql", "users"}, {"create_group.sql", "groups"}, {"alter_group.sql", "group members"}, + {"create_async_replication.sql", "async replication"}, }; const TFileInfo& TableScheme() { @@ -98,4 +100,8 @@ const TFileInfo& AlterGroup() { return FILES_INFO[ALTER_GROUP]; } +const TFileInfo& CreateAsyncReplication() { + return FILES_INFO[CREATE_ASYNC_REPLICATION]; +} + } // NYdb::NDump::NFiles diff --git a/ydb/public/lib/ydb_cli/dump/files/files.h b/ydb/public/lib/ydb_cli/dump/files/files.h index 6b0fdca158..96e293e66e 100644 --- a/ydb/public/lib/ydb_cli/dump/files/files.h +++ b/ydb/public/lib/ydb_cli/dump/files/files.h @@ -22,5 +22,6 @@ const TFileInfo& Database(); const TFileInfo& CreateUser(); const TFileInfo& CreateGroup(); const TFileInfo& AlterGroup(); +const TFileInfo& CreateAsyncReplication(); } // NYdb::NDump:NFiles |