aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2025-02-11 17:37:44 +0300
committerGitHub <noreply@github.com>2025-02-11 17:37:44 +0300
commit5a8b1e9af4d74067ff21e038ae61ac0520efa45f (patch)
tree0427608f003350b7f3bce110942a71a10dcd4846
parent7a4c2f7e40790497b5140c5930d4959774b2670c (diff)
downloadydb-5a8b1e9af4d74067ff21e038ae61ac0520efa45f.tar.gz
Backup async replication (#14441)
-rw-r--r--ydb/library/backup/backup.cpp110
-rw-r--r--ydb/library/backup/db_iterator.h18
-rw-r--r--ydb/public/lib/ydb_cli/dump/files/files.cpp6
-rw-r--r--ydb/public/lib/ydb_cli/dump/files/files.h1
4 files changed, 134 insertions, 1 deletions
diff --git a/ydb/library/backup/backup.cpp b/ydb/library/backup/backup.cpp
index 7024072ac2..4d313baa0a 100644
--- a/ydb/library/backup/backup.cpp
+++ b/ydb/library/backup/backup.cpp
@@ -3,6 +3,7 @@
#include "util.h"
#include <ydb-cpp-sdk/client/cms/cms.h>
+#include <ydb-cpp-sdk/client/draft/ydb_replication.h>
#include <ydb-cpp-sdk/client/draft/ydb_view.h>
#include <ydb-cpp-sdk/client/driver/driver.h>
#include <ydb-cpp-sdk/client/proto/accessor.h>
@@ -10,6 +11,7 @@
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/topic/client.h>
#include <ydb-cpp-sdk/client/value/value.h>
+#include <ydb/public/api/protos/draft/ydb_replication.pb.h>
#include <ydb/public/api/protos/draft/ydb_view.pb.h>
#include <ydb/public/api/protos/ydb_cms.pb.h>
#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
@@ -52,6 +54,7 @@
#include <google/protobuf/text_format.h>
+#include <format>
namespace NYdb::NBackup {
@@ -683,6 +686,110 @@ void BackupCoordinationNode(TDriver driver, const TString& dbPath, const TFsPath
BackupPermissions(driver, dbPath, fsBackupFolder);
}
+namespace {
+
+NReplication::TReplicationDescription DescribeReplication(TDriver driver, const TString& path) {
+ NReplication::TReplicationClient client(driver);
+ auto status = NConsoleClient::RetryFunction([&]() {
+ return client.DescribeReplication(path).ExtractValueSync();
+ });
+ VerifyStatus(status, "describe async replication");
+ return status.GetReplicationDescription();
+}
+
+TString BuildConnectionString(const NReplication::TConnectionParams& params) {
+ return TStringBuilder()
+ << (params.GetEnableSsl() ? "grpcs://" : "grpc://")
+ << params.GetDiscoveryEndpoint()
+ << "/?database=" << params.GetDatabase();
+}
+
+inline TString BuildTarget(const char* src, const char* dst) {
+ return TStringBuilder() << " `" << src << "` AS `" << dst << "`";
+}
+
+inline TString Quote(const char* value) {
+ return TStringBuilder() << "'" << value << "'";
+}
+
+template <typename StringType>
+inline TString Quote(const StringType& value) {
+ return Quote(value.c_str());
+}
+
+inline TString BuildOption(const char* key, const TString& value) {
+ return TStringBuilder() << " " << key << " = " << value << "";
+}
+
+inline TString Interval(const TDuration& value) {
+ return TStringBuilder() << "Interval('PT" << value.Seconds() << "S')";
+}
+
+TString BuildCreateReplicationQuery(
+ const TString& name,
+ const TString& dbPath,
+ const NReplication::TReplicationDescription& desc,
+ const TString& backupRoot,
+ NYql::TIssues& issues)
+{
+ // TODO(ilnaz)
+ Y_UNUSED(dbPath);
+ Y_UNUSED(backupRoot);
+ Y_UNUSED(issues);
+
+ TVector<TString> targets(::Reserve(desc.GetItems().size()));
+ for (const auto& item : desc.GetItems()) {
+ if (!item.DstPath.ends_with("/indexImplTable")) { // TODO(ilnaz): get rid of this hack
+ targets.push_back(BuildTarget(item.SrcPath.c_str(), item.DstPath.c_str()));
+ }
+ }
+
+ const auto& params = desc.GetConnectionParams();
+
+ TVector<TString> opts(::Reserve(5 /* max options */));
+ opts.push_back(BuildOption("CONNECTION_STRING", Quote(BuildConnectionString(params))));
+ switch (params.GetCredentials()) {
+ case NReplication::TConnectionParams::ECredentials::Static:
+ opts.push_back(BuildOption("USER", Quote(params.GetStaticCredentials().User)));
+ opts.push_back(BuildOption("PASSWORD_SECRET_NAME", Quote(params.GetStaticCredentials().PasswordSecretName)));
+ break;
+ case NReplication::TConnectionParams::ECredentials::OAuth:
+ opts.push_back(BuildOption("TOKEN_SECRET_NAME", Quote(params.GetOAuthCredentials().TokenSecretName)));
+ break;
+ }
+
+ opts.push_back(BuildOption("CONSISTENCY_LEVEL", Quote(ToString(desc.GetConsistencyLevel()))));
+ if (desc.GetConsistencyLevel() == NReplication::TReplicationDescription::EConsistencyLevel::Global) {
+ opts.push_back(BuildOption("COMMIT_INTERVAL", Interval(desc.GetGlobalConsistency().GetCommitInterval())));
+ }
+
+ return std::format("CREATE ASYNC REPLICATION `{}`\nFOR\n{}\nWITH (\n{}\n);",
+ name.c_str(), JoinSeq(",\n", targets).c_str(), JoinSeq(",\n", opts).c_str());
+}
+
+}
+
+void BackupReplication(
+ TDriver driver,
+ const TString& dbBackupRoot,
+ const TString& dbPathRelativeToBackupRoot,
+ const TFsPath& fsBackupFolder,
+ NYql::TIssues& issues)
+{
+ Y_ENSURE(!dbPathRelativeToBackupRoot.empty());
+ const auto dbPath = JoinDatabasePath(dbBackupRoot, dbPathRelativeToBackupRoot);
+
+ LOG_I("Backup async replication " << dbPath.Quote() << " to " << fsBackupFolder.GetPath().Quote());
+
+ const auto name = TFsPath(dbPathRelativeToBackupRoot).GetName();
+ const auto desc = DescribeReplication(driver, dbPath);
+ const auto creationQuery = BuildCreateReplicationQuery(name, dbPath, desc, dbBackupRoot, issues);
+ Y_ENSURE(creationQuery, issues.ToString());
+
+ WriteCreationQueryToFile(creationQuery, fsBackupFolder, NDump::NFiles::CreateAsyncReplication());
+ BackupPermissions(driver, dbPath, fsBackupFolder);
+}
+
void CreateClusterDirectory(const TDriver& driver, const TString& path, bool rootBackupDir = false) {
if (rootBackupDir) {
LOG_I("Create temporary directory " << path.Quote() << " in database");
@@ -776,6 +883,9 @@ void BackupFolderImpl(TDriver driver, const TString& dbPrefix, const TString& ba
if (dbIt.IsCoordinationNode()) {
BackupCoordinationNode(driver, dbIt.GetFullPath(), childFolderPath);
}
+ if (dbIt.IsReplication()) {
+ BackupReplication(driver, dbIt.GetTraverseRoot(), dbIt.GetRelPath(), childFolderPath, issues);
+ }
dbIt.Next();
}
}
diff --git a/ydb/library/backup/db_iterator.h b/ydb/library/backup/db_iterator.h
index 388f9ef405..1f8bec8973 100644
--- a/ydb/library/backup/db_iterator.h
+++ b/ydb/library/backup/db_iterator.h
@@ -40,6 +40,18 @@ private:
TString TraverseRoot;
TDeque<TSchemeEntryWithPath> NextNodes;
+ static const TVector<NScheme::ESchemeEntryType>& SupportedEntryTypes() {
+ static const TVector<NScheme::ESchemeEntryType> values = {
+ NScheme::ESchemeEntryType::Table,
+ NScheme::ESchemeEntryType::View,
+ NScheme::ESchemeEntryType::Topic,
+ NScheme::ESchemeEntryType::CoordinationNode,
+ NScheme::ESchemeEntryType::Replication,
+ };
+
+ return values;
+ }
+
public:
TDbIterator(TDriver driver, const TString& fullPath)
: Client(driver)
@@ -48,7 +60,7 @@ public:
Y_ENSURE(listResult.IsSuccess(), "Can't list directory, maybe it doesn't exist, dbPath# "
<< fullPath.Quote());
- if (IsIn({NScheme::ESchemeEntryType::Table, NScheme::ESchemeEntryType::View}, listResult.GetEntry().Type)) {
+ if (IsIn(SupportedEntryTypes(), listResult.GetEntry().Type)) {
TPathSplitUnix parentPath(fullPath);
parentPath.pop_back();
TraverseRoot = parentPath.Reconstruct();
@@ -145,6 +157,10 @@ public:
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Directory;
}
+ bool IsReplication() const {
+ return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Replication;
+ }
+
bool IsListed() const {
return NextNodes.front().IsListed;
}
diff --git a/ydb/public/lib/ydb_cli/dump/files/files.cpp b/ydb/public/lib/ydb_cli/dump/files/files.cpp
index 496909c623..885cf4f8ce 100644
--- a/ydb/public/lib/ydb_cli/dump/files/files.cpp
+++ b/ydb/public/lib/ydb_cli/dump/files/files.cpp
@@ -18,6 +18,7 @@ enum EFilesType {
CREATE_USER,
CREATE_GROUP,
ALTER_GROUP,
+ CREATE_ASYNC_REPLICATION,
};
static constexpr TFileInfo FILES_INFO[] = {
@@ -36,6 +37,7 @@ static constexpr TFileInfo FILES_INFO[] = {
{"create_user.sql", "users"},
{"create_group.sql", "groups"},
{"alter_group.sql", "group members"},
+ {"create_async_replication.sql", "async replication"},
};
const TFileInfo& TableScheme() {
@@ -98,4 +100,8 @@ const TFileInfo& AlterGroup() {
return FILES_INFO[ALTER_GROUP];
}
+const TFileInfo& CreateAsyncReplication() {
+ return FILES_INFO[CREATE_ASYNC_REPLICATION];
+}
+
} // NYdb::NDump::NFiles
diff --git a/ydb/public/lib/ydb_cli/dump/files/files.h b/ydb/public/lib/ydb_cli/dump/files/files.h
index 6b0fdca158..96e293e66e 100644
--- a/ydb/public/lib/ydb_cli/dump/files/files.h
+++ b/ydb/public/lib/ydb_cli/dump/files/files.h
@@ -22,5 +22,6 @@ const TFileInfo& Database();
const TFileInfo& CreateUser();
const TFileInfo& CreateGroup();
const TFileInfo& AlterGroup();
+const TFileInfo& CreateAsyncReplication();
} // NYdb::NDump:NFiles