aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinnokentii <innokentii@yandex-team.com>2023-07-11 17:12:17 +0300
committerinnokentii <innokentii@yandex-team.com>2023-07-11 17:12:17 +0300
commitb1c129f96ce0aacaca34fd7bdc813bcac6f32f58 (patch)
tree42a9a049dce6cc4e754e4bd2c69338d5738f9e1a
parentfa87beed76de6d492f8e7ce5dd00e1ae460de3b9 (diff)
downloadydb-b1c129f96ce0aacaca34fd7bdc813bcac6f32f58.tar.gz
Add metadata to fullbackup
implement basic metadata model add basic metadata to full backup
-rw-r--r--ydb/core/protos/flat_scheme_op.proto3
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/backup_restore_common.cpp25
-rw-r--r--ydb/core/tx/datashard/backup_restore_common.h53
-rw-r--r--ydb/core/tx/datashard/backup_restore_traits.h4
-rw-r--r--ydb/core/tx/datashard/export_s3_base_uploader.h54
-rw-r--r--ydb/core/tx/datashard/export_s3_uploader.cpp14
-rw-r--r--ydb/core/tx/datashard/extstorage_usage_config.h4
-rw-r--r--ydb/core/tx/datashard/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h22
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h3
-rw-r--r--ydb/core/tx/schemeshard/ut_backup.cpp4
-rw-r--r--ydb/core/tx/schemeshard/ut_export.cpp29
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.cpp13
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.h1
21 files changed, 225 insertions, 24 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 30da267d7f..7e50110e64 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -967,6 +967,9 @@ message TBackupTask {
}
optional TCompressionOptions Compression = 13; // currently available for s3
+
+ optional uint64 SnapshotStep = 14;
+ optional uint64 SnapshotTxId = 15;
}
message TRestoreTask {
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index d5ac2c354c..c986918714 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -147,6 +147,7 @@ target_link_libraries(core-tx-datashard PUBLIC
target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 2579f32353..01e8828d4a 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -148,6 +148,7 @@ target_link_libraries(core-tx-datashard PUBLIC
target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 2579f32353..01e8828d4a 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -148,6 +148,7 @@ target_link_libraries(core-tx-datashard PUBLIC
target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index ab07c22cd0..fd6d20ca80 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -148,6 +148,7 @@ target_link_libraries(core-tx-datashard PUBLIC
target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
diff --git a/ydb/core/tx/datashard/backup_restore_common.cpp b/ydb/core/tx/datashard/backup_restore_common.cpp
new file mode 100644
index 0000000000..dad590c13d
--- /dev/null
+++ b/ydb/core/tx/datashard/backup_restore_common.cpp
@@ -0,0 +1,25 @@
+#include "backup_restore_common.h"
+
+namespace NKikimr::NDataShard::NBackupRestore {
+
+void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) {
+ FullBackups.emplace(fb->SnapshotVts, fb);
+}
+
+TString TMetadata::Serialize() const {
+ NJson::TJsonMap m;
+ m["version"] = 0;
+ NJson::TJsonArray fullBackups;
+ for (auto &[tp, _] : FullBackups) {
+ NJson::TJsonMap backupMap;
+ NJson::TJsonArray vts;
+ vts.AppendValue(tp.Step);
+ vts.AppendValue(tp.TxId);
+ backupMap["snapshot_vts"] = std::move(vts);
+ fullBackups.AppendValue(std::move(backupMap));
+ }
+ m["full_backups"] = fullBackups;
+ return NJson::WriteJson(&m, false);
+}
+
+} // NKikimr::NDataShard::NBackupRestore
diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h
index 5c4a440a1f..45b59960ed 100644
--- a/ydb/core/tx/datashard/backup_restore_common.h
+++ b/ydb/core/tx/datashard/backup_restore_common.h
@@ -6,6 +6,12 @@
#include "datashard_pipeline.h"
#include "execution_unit.h"
+#include <ydb/core/base/row_version.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+
namespace NKikimr {
namespace NDataShard {
@@ -133,5 +139,52 @@ public:
}; // TBackupRestoreUnitBase
+namespace NBackupRestore {
+
+using TVirtualTimestamp = TRowVersion;
+
+enum class EStorageType {
+ YT,
+ S3,
+};
+
+struct TLogMetadata : TSimpleRefCount<TLogMetadata> {
+ using TPtr = TIntrusivePtr<TLogMetadata>;
+
+ const TVirtualTimestamp StartVts;
+ TString ConsistencyKey;
+ EStorageType StorageType;
+ TString StoragePath;
+};
+
+struct TFullBackupMetadata : TSimpleRefCount<TFullBackupMetadata> {
+ using TPtr = TIntrusivePtr<TFullBackupMetadata>;
+
+ const TVirtualTimestamp SnapshotVts;
+ TString ConsistencyKey;
+ TLogMetadata::TPtr FollowingLog;
+ EStorageType StorageType;
+ TString StoragePath;
+};
+
+class TMetadata {
+public:
+ TMetadata() = default;
+ TMetadata(TVector<TFullBackupMetadata::TPtr>&& fullBackups, TVector<TLogMetadata::TPtr>&& logs);
+
+ void AddFullBackup(TFullBackupMetadata::TPtr fullBackup);
+ void AddLog(TLogMetadata::TPtr log);
+ void SetConsistencyKey(const TString& key);
+
+ TString Serialize() const;
+ static TMetadata Deserialize(const TString& metadata);
+private:
+ TString ConsistencyKey;
+ TMap<TVirtualTimestamp, TFullBackupMetadata::TPtr> FullBackups;
+ TMap<TVirtualTimestamp, TLogMetadata::TPtr> Logs;
+};
+
+} // NBackupRestore
+
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h
index 78b93232c6..64ee87a2fd 100644
--- a/ydb/core/tx/datashard/backup_restore_traits.h
+++ b/ydb/core/tx/datashard/backup_restore_traits.h
@@ -34,6 +34,10 @@ inline TString SchemeKey(const TString& objKeyPattern) {
return Sprintf("%s/scheme.pb", objKeyPattern.c_str());
}
+inline TString MetadataKey(const TString& objKeyPattern) {
+ return Sprintf("%s/metadata.json", objKeyPattern.c_str());
+}
+
inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) {
const auto ext = DataFileExtension(format, codec);
return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str());
diff --git a/ydb/core/tx/datashard/export_s3_base_uploader.h b/ydb/core/tx/datashard/export_s3_base_uploader.h
index 3e551321dc..2c006e4407 100644
--- a/ydb/core/tx/datashard/export_s3_base_uploader.h
+++ b/ydb/core/tx/datashard/export_s3_base_uploader.h
@@ -54,9 +54,9 @@ protected:
Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
- if (!SchemeUploaded) {
- this->Become(&TDerived::StateUploadScheme);
-
+ if (!MetadataUploaded) {
+ UploadMetadata();
+ } else if (!SchemeUploaded) {
UploadScheme();
} else {
this->Become(&TDerived::StateUploadData);
@@ -82,6 +82,21 @@ protected:
.WithKey(Settings.GetSchemeKey())
.WithStorageClass(Settings.GetStorageClass());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
+
+ this->Become(&TDerived::StateUploadScheme);
+ }
+
+ void UploadMetadata() {
+ Y_VERIFY(!MetadataUploaded);
+
+ Buffer = std::move(Metadata);
+
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithKey(Settings.GetMetadataKey())
+ .WithStorageClass(Settings.GetStorageClass());
+ this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
+
+ this->Become(&TDerived::StateUploadMetadata);
}
void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
@@ -104,6 +119,22 @@ protected:
this->Become(&TDerived::StateUploadData);
}
+ void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("HandleMetadata TEvExternalStorage::TEvPutObjectResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, TStringBuf("PutObject (metadata)"))) {
+ return;
+ }
+
+ MetadataUploaded = true;
+
+ UploadScheme();
+ }
+
void Handle(TEvExportScan::TEvReady::TPtr& ev) {
EXPORT_LOG_D("Handle TEvExportScan::TEvReady"
<< ": self# " << this->SelfId()
@@ -115,7 +146,7 @@ protected:
return PassAway();
}
- if (ProxyResolved && SchemeUploaded) {
+ if (ProxyResolved && SchemeUploaded && MetadataUploaded) {
this->Send(Scanner, new TEvExportScan::TEvFeed());
}
}
@@ -403,7 +434,8 @@ public:
explicit TS3UploaderBase(
const TActorId& dataShard, ui64 txId,
const NKikimrSchemeOp::TBackupTask& task,
- TMaybe<Ydb::Table::CreateTableRequest>&& scheme)
+ TMaybe<Ydb::Table::CreateTableRequest>&& scheme,
+ TString&& metadata)
: ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(task.GetS3Settings()))
, Settings(TS3Settings::FromBackupTask(task))
, DataFormat(NBackupRestoreTraits::EDataFormat::Csv)
@@ -411,10 +443,12 @@ public:
, DataShard(dataShard)
, TxId(txId)
, Scheme(std::move(scheme))
+ , Metadata(std::move(metadata))
, Retries(task.GetNumberOfRetries())
, Attempt(0)
, Delay(TDuration::Minutes(1))
, SchemeUploaded(task.GetShardNum() == 0 ? false : true)
+ , MetadataUploaded(task.GetShardNum() == 0 ? false : true)
{
}
@@ -448,6 +482,14 @@ public:
}
}
+ STATEFN(StateUploadMetadata) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadata);
+ default:
+ return StateBase(ev);
+ }
+ }
+
STATEFN(StateUploadData) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvBuffer, Handle);
@@ -474,6 +516,7 @@ private:
const TActorId DataShard;
const ui64 TxId;
const TMaybe<Ydb::Table::CreateTableRequest> Scheme;
+ const TString Metadata;
const ui32 Retries;
ui32 Attempt;
@@ -481,6 +524,7 @@ private:
TActorId Client;
TDuration Delay;
bool SchemeUploaded;
+ bool MetadataUploaded;
bool MultiPart;
bool Last;
diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp
index 351544de46..4ba21b3613 100644
--- a/ydb/core/tx/datashard/export_s3_uploader.cpp
+++ b/ydb/core/tx/datashard/export_s3_uploader.cpp
@@ -2,6 +2,8 @@
#include "export_s3_base_uploader.h"
+#include "backup_restore_common.h"
+
namespace NKikimr {
namespace NDataShard {
@@ -25,7 +27,17 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
? GenYdbScheme(Columns, Task.GetTable())
: Nothing();
- return new TS3Uploader(dataShard, txId, Task, std::move(scheme));
+ NBackupRestore::TMetadata metadata;
+
+ NBackupRestore::TFullBackupMetadata::TPtr backup = new NBackupRestore::TFullBackupMetadata{
+ .SnapshotVts = NBackupRestore::TVirtualTimestamp(
+ Task.GetSnapshotStep(),
+ Task.GetSnapshotTxId())
+ };
+ metadata.AddFullBackup(backup);
+
+ return new TS3Uploader(
+ dataShard, txId, Task, std::move(scheme), metadata.Serialize());
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h
index 35058bb438..0beeb43b83 100644
--- a/ydb/core/tx/datashard/extstorage_usage_config.h
+++ b/ydb/core/tx/datashard/extstorage_usage_config.h
@@ -41,6 +41,10 @@ public:
Aws::S3::Model::StorageClass GetStorageClass() const;
+ inline TString GetMetadataKey() const {
+ return NBackupRestoreTraits::MetadataKey(ObjectKeyPattern);
+ }
+
inline TString GetSchemeKey() const {
return NBackupRestoreTraits::SchemeKey(ObjectKeyPattern);
}
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index 83c04e7ffa..d54c4c9cc0 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
alter_cdc_stream_unit.cpp
alter_table_unit.cpp
+ backup_restore_common.cpp
backup_restore_traits.cpp
backup_unit.cpp
build_and_wait_dependencies_unit.cpp
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h
index c7fadcec47..e031beab2e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h
@@ -3,6 +3,8 @@
#include "schemeshard__operation_common.h"
#include "schemeshard_billing_helpers.h"
#include "schemeshard_impl.h"
+#include "schemeshard_path_describer.h"
+#include "schemeshard_types.h"
#include <ydb/core/base/subdomain.h>
#include <ydb/core/metering/metering.h>
@@ -15,6 +17,7 @@ template <typename TKind>
class TConfigurePart: public TSubOperationState {
const TTxState::ETxType TxType;
const TOperationId OperationId;
+ const TVirtualTimestamp SnapshotTime;
TString DebugHint() const override {
return TStringBuilder()
@@ -23,9 +26,10 @@ class TConfigurePart: public TSubOperationState {
}
public:
- TConfigurePart(TTxState::ETxType type, TOperationId id)
+ TConfigurePart(TTxState::ETxType type, TOperationId id, TVirtualTimestamp snapshotTime)
: TxType(type)
, OperationId(id)
+ , SnapshotTime(snapshotTime)
{
IgnoreMessages(DebugHint(), {});
}
@@ -55,7 +59,11 @@ public:
Y_VERIFY(txState->State == TTxState::ConfigureParts);
txState->ClearShardsInProgress();
- TKind::ProposeTx(OperationId, *txState, context);
+ if constexpr (TKind::NeedSnapshotTime()) {
+ TKind::ProposeTx(OperationId, *txState, context, SnapshotTime);
+ } else {
+ TKind::ProposeTx(OperationId, *txState, context);
+ }
txState->UpdateShardsInProgress(TTxState::ConfigureParts);
return false;
@@ -444,6 +452,7 @@ template <typename TKind, typename TEvCancel>
class TBackupRestoreOperationBase: public TSubOperation {
const TTxState::ETxType TxType;
const TPathElement::EPathState Lock;
+ TVirtualTimestamp SnapshotTime;
static TTxState::ETxState NextState() {
return TTxState::CreateParts;
@@ -490,7 +499,7 @@ class TBackupRestoreOperationBase: public TSubOperation {
case TTxState::CreateParts:
return MakeHolder<TCreateParts>(OperationId);
case TTxState::ConfigureParts:
- return MakeHolder<TConfigurePart<TKind>>(TxType, OperationId);
+ return MakeHolder<TConfigurePart<TKind>>(TxType, OperationId, SnapshotTime);
case TTxState::Propose:
return MakeHolder<TPropose<TKind>>(TxType, OperationId);
case TTxState::ProposedWaitParts:
@@ -627,6 +636,13 @@ public:
return result;
}
+ if constexpr (TKind::NeedSnapshotTime()) {
+ TPathElement::TPtr targetPath = context.SS->PathsById.at(path.Base()->PathId);
+
+ SnapshotTime.SetStep(targetPath->StepCreated);
+ SnapshotTime.SetTxId(targetPath->CreateTxId);
+ }
+
PrepareChanges(path.Base(), context);
SetState(NextState());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp
index 8464c4c942..301c7d2aa1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp
@@ -9,6 +9,10 @@ struct TBackup {
return "TBackup";
}
+ static constexpr bool NeedSnapshotTime() {
+ return true;
+ }
+
static bool HasTask(const TTxTransaction& tx) {
return tx.HasBackup();
}
@@ -17,11 +21,13 @@ struct TBackup {
return tx.GetBackup().GetTableName();
}
- static void ProposeTx(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
+ static void ProposeTx(const TOperationId& opId, TTxState& txState, TOperationContext& context, TVirtualTimestamp snapshotTime) {
const auto& pathId = txState.TargetPathId;
Y_VERIFY(context.SS->Tables.contains(pathId));
TTableInfo::TPtr table = context.SS->Tables.at(pathId);
NKikimrSchemeOp::TBackupTask backup = table->BackupSettings;
+ backup.SetSnapshotStep(snapshotTime.Step);
+ backup.SetSnapshotTxId(snapshotTime.TxId);
const auto seqNo = context.SS->StartRound(txState);
for (ui32 i = 0; i < txState.Shards.size(); ++i) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp
index 88196a65e3..73c1de4bdb 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp
@@ -9,6 +9,10 @@ struct TRestore {
return "TRestore";
}
+ static constexpr bool NeedSnapshotTime() {
+ return false;
+ }
+
static bool HasTask(const TTxTransaction& tx) {
return tx.HasRestore();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
index 62a1c5845a..6043045380 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
@@ -109,6 +109,9 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId));
}
+ task.SetSnapshotStep(exportInfo->SnapshotStep);
+ task.SetSnapshotTxId(exportInfo->SnapshotTxId);
+
switch (exportInfo->Kind) {
case TExportInfo::EKind::YT:
{
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index b1c373f31a..61062967b0 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2672,6 +2672,9 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> {
TSet<TActorId> Subscribers;
+ ui64 SnapshotStep = 0;
+ ui64 SnapshotTxId = 0;
+
explicit TExportInfo(
const ui64 id,
const TString& uid,
diff --git a/ydb/core/tx/schemeshard/ut_backup.cpp b/ydb/core/tx/schemeshard/ut_backup.cpp
index be41e95cfe..e59b0e1592 100644
--- a/ydb/core/tx/schemeshard/ut_backup.cpp
+++ b/ydb/core/tx/schemeshard/ut_backup.cpp
@@ -135,11 +135,11 @@ Y_UNIT_TEST_SUITE(TBackupTests) {
}
Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnLargeData) {
- ShouldSucceedOnLargeData<Codec>(0, std::make_pair(101, 1));
+ ShouldSucceedOnLargeData<Codec>(0, std::make_pair(101, 2));
}
Y_UNIT_TEST(ShouldSucceedOnLargeData_MinWriteBatch) {
- ShouldSucceedOnLargeData<ECompressionCodec::Zstd>(1 << 20, std::make_pair(0, 2));
+ ShouldSucceedOnLargeData<ECompressionCodec::Zstd>(1 << 20, std::make_pair(0, 3));
}
} // TBackupTests
diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp
index 4c17ba3d2a..cc078fc6b1 100644
--- a/ydb/core/tx/schemeshard/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export.cpp
@@ -186,15 +186,25 @@ namespace {
} // anonymous
Y_UNIT_TEST_SUITE(TExportToS3Tests) {
- void RunS3(TTestBasicRuntime& runtime, const TVector<TString>& tables, const TString& request) {
+ void RunS3(TTestBasicRuntime& runtime, const TVector<TString>& tables, const TString& requestTpl) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());
+ auto request = Sprintf(requestTpl.c_str(), port);
+
TTestEnv env(runtime);
- Run(runtime, env, tables, Sprintf(request.c_str(), port), Ydb::StatusIds::SUCCESS, "/MyRoot", false);
+ Run(runtime, env, tables, request, Ydb::StatusIds::SUCCESS, "/MyRoot", false);
+
+ for (auto &path : GetExportTargetPaths(request)) {
+ auto canonPath = (path.StartsWith("/") || path.empty()) ? path : TString("/") + path;
+ auto it = s3Mock.GetData().find(canonPath + "/metadata.json");
+ UNIT_ASSERT(it != s3Mock.GetData().end());
+ it = s3Mock.GetData().find(canonPath + "/scheme.pb");
+ UNIT_ASSERT(it != s3Mock.GetData().end());
+ }
}
Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) {
@@ -284,16 +294,6 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
- TString scheme;
- runtime.SetObserverFunc([&scheme](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
- if (!scheme && ev->GetTypeRewrite() == NWrappers::NExternalStorage::EvPutObjectRequest) {
- const auto* msg = ev->Get<NWrappers::NExternalStorage::TEvPutObjectRequest>();
- scheme = msg->Body;
- }
-
- return TTestActorRuntime::EEventAction::PROCESS;
- });
-
const TVector<TString> tables = {R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
@@ -331,6 +331,11 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) {
}
)", port));
+ auto schemeIt = s3Mock.GetData().find("/scheme.pb");
+ UNIT_ASSERT(schemeIt != s3Mock.GetData().end());
+
+ TString scheme = schemeIt->second;
+
UNIT_ASSERT_NO_DIFF(scheme, R"(columns {
name: "key"
type {
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
index 78538b1614..6327604dc5 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
@@ -971,6 +971,19 @@ namespace NSchemeShardUT_Private {
CheckExpectedResult(expectedResults, event->Record.GetStatus(), event->Record.GetResult());
}
+ TVector<TString> GetExportTargetPaths(const TString& requestStr) {
+ NKikimrExport::TCreateExportRequest request;
+ UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(requestStr, &request));
+
+ TVector<TString> result;
+
+ for (auto &item : request.GetExportToS3Settings().items()) {
+ result.push_back(item.destination_prefix());
+ }
+
+ return result;
+ }
+
void AsyncExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID) {
NKikimrExport::TCreateExportRequest request;
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(requestStr, &request));
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
index 57bdfb7e00..83b2b03290 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
@@ -345,6 +345,7 @@ namespace NSchemeShardUT_Private {
NKikimrIndexBuilder::TEvForgetResponse TestForgetBuildIndex(TTestActorRuntime& runtime, const ui64 id, const ui64 schemeShard, const TString &dbName, const ui64 buildIndexId, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
////////// export
+ TVector<TString> GetExportTargetPaths(const TString& requestStr);
void AsyncExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "");
void AsyncExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "");
void TestExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "",