diff options
author | innokentii <innokentii@yandex-team.com> | 2023-07-11 17:12:17 +0300 |
---|---|---|
committer | innokentii <innokentii@yandex-team.com> | 2023-07-11 17:12:17 +0300 |
commit | b1c129f96ce0aacaca34fd7bdc813bcac6f32f58 (patch) | |
tree | 42a9a049dce6cc4e754e4bd2c69338d5738f9e1a | |
parent | fa87beed76de6d492f8e7ce5dd00e1ae460de3b9 (diff) | |
download | ydb-b1c129f96ce0aacaca34fd7bdc813bcac6f32f58.tar.gz |
Add metadata to fullbackup
implement basic metadata model
add basic metadata to full backup
21 files changed, 225 insertions, 24 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 30da267d7f..7e50110e64 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -967,6 +967,9 @@ message TBackupTask { } optional TCompressionOptions Compression = 13; // currently available for s3 + + optional uint64 SnapshotStep = 14; + optional uint64 SnapshotTxId = 15; } message TRestoreTask { diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index d5ac2c354c..c986918714 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -147,6 +147,7 @@ target_link_libraries(core-tx-datashard PUBLIC target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 2579f32353..01e8828d4a 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -148,6 +148,7 @@ target_link_libraries(core-tx-datashard PUBLIC target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 2579f32353..01e8828d4a 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -148,6 +148,7 @@ target_link_libraries(core-tx-datashard PUBLIC target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index ab07c22cd0..fd6d20ca80 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -148,6 +148,7 @@ target_link_libraries(core-tx-datashard PUBLIC target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp diff --git a/ydb/core/tx/datashard/backup_restore_common.cpp b/ydb/core/tx/datashard/backup_restore_common.cpp new file mode 100644 index 0000000000..dad590c13d --- /dev/null +++ b/ydb/core/tx/datashard/backup_restore_common.cpp @@ -0,0 +1,25 @@ +#include "backup_restore_common.h" + +namespace NKikimr::NDataShard::NBackupRestore { + +void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) { + FullBackups.emplace(fb->SnapshotVts, fb); +} + +TString TMetadata::Serialize() const { + NJson::TJsonMap m; + m["version"] = 0; + NJson::TJsonArray fullBackups; + for (auto &[tp, _] : FullBackups) { + NJson::TJsonMap backupMap; + NJson::TJsonArray vts; + vts.AppendValue(tp.Step); + vts.AppendValue(tp.TxId); + backupMap["snapshot_vts"] = std::move(vts); + fullBackups.AppendValue(std::move(backupMap)); + } + m["full_backups"] = fullBackups; + return NJson::WriteJson(&m, false); +} + +} // NKikimr::NDataShard::NBackupRestore diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h index 5c4a440a1f..45b59960ed 100644 --- a/ydb/core/tx/datashard/backup_restore_common.h +++ b/ydb/core/tx/datashard/backup_restore_common.h @@ -6,6 +6,12 @@ #include "datashard_pipeline.h" #include "execution_unit.h" +#include <ydb/core/base/row_version.h> + +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> + namespace NKikimr { namespace NDataShard { @@ -133,5 +139,52 @@ public: }; // TBackupRestoreUnitBase +namespace NBackupRestore { + +using TVirtualTimestamp = TRowVersion; + +enum class EStorageType { + YT, + S3, +}; + +struct TLogMetadata : TSimpleRefCount<TLogMetadata> { + using TPtr = TIntrusivePtr<TLogMetadata>; + + const TVirtualTimestamp StartVts; + TString ConsistencyKey; + EStorageType StorageType; + TString StoragePath; +}; + +struct TFullBackupMetadata : TSimpleRefCount<TFullBackupMetadata> { + using TPtr = TIntrusivePtr<TFullBackupMetadata>; + + const TVirtualTimestamp SnapshotVts; + TString ConsistencyKey; + TLogMetadata::TPtr FollowingLog; + EStorageType StorageType; + TString StoragePath; +}; + +class TMetadata { +public: + TMetadata() = default; + TMetadata(TVector<TFullBackupMetadata::TPtr>&& fullBackups, TVector<TLogMetadata::TPtr>&& logs); + + void AddFullBackup(TFullBackupMetadata::TPtr fullBackup); + void AddLog(TLogMetadata::TPtr log); + void SetConsistencyKey(const TString& key); + + TString Serialize() const; + static TMetadata Deserialize(const TString& metadata); +private: + TString ConsistencyKey; + TMap<TVirtualTimestamp, TFullBackupMetadata::TPtr> FullBackups; + TMap<TVirtualTimestamp, TLogMetadata::TPtr> Logs; +}; + +} // NBackupRestore + } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h index 78b93232c6..64ee87a2fd 100644 --- a/ydb/core/tx/datashard/backup_restore_traits.h +++ b/ydb/core/tx/datashard/backup_restore_traits.h @@ -34,6 +34,10 @@ inline TString SchemeKey(const TString& objKeyPattern) { return Sprintf("%s/scheme.pb", objKeyPattern.c_str()); } +inline TString MetadataKey(const TString& objKeyPattern) { + return Sprintf("%s/metadata.json", objKeyPattern.c_str()); +} + inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) { const auto ext = DataFileExtension(format, codec); return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str()); diff --git a/ydb/core/tx/datashard/export_s3_base_uploader.h b/ydb/core/tx/datashard/export_s3_base_uploader.h index 3e551321dc..2c006e4407 100644 --- a/ydb/core/tx/datashard/export_s3_base_uploader.h +++ b/ydb/core/tx/datashard/export_s3_base_uploader.h @@ -54,9 +54,9 @@ protected: Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); - if (!SchemeUploaded) { - this->Become(&TDerived::StateUploadScheme); - + if (!MetadataUploaded) { + UploadMetadata(); + } else if (!SchemeUploaded) { UploadScheme(); } else { this->Become(&TDerived::StateUploadData); @@ -82,6 +82,21 @@ protected: .WithKey(Settings.GetSchemeKey()) .WithStorageClass(Settings.GetStorageClass()); this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); + + this->Become(&TDerived::StateUploadScheme); + } + + void UploadMetadata() { + Y_VERIFY(!MetadataUploaded); + + Buffer = std::move(Metadata); + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Settings.GetMetadataKey()) + .WithStorageClass(Settings.GetStorageClass()); + this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer))); + + this->Become(&TDerived::StateUploadMetadata); } void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { @@ -104,6 +119,22 @@ protected: this->Become(&TDerived::StateUploadData); } + void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("HandleMetadata TEvExternalStorage::TEvPutObjectResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (!CheckResult(result, TStringBuf("PutObject (metadata)"))) { + return; + } + + MetadataUploaded = true; + + UploadScheme(); + } + void Handle(TEvExportScan::TEvReady::TPtr& ev) { EXPORT_LOG_D("Handle TEvExportScan::TEvReady" << ": self# " << this->SelfId() @@ -115,7 +146,7 @@ protected: return PassAway(); } - if (ProxyResolved && SchemeUploaded) { + if (ProxyResolved && SchemeUploaded && MetadataUploaded) { this->Send(Scanner, new TEvExportScan::TEvFeed()); } } @@ -403,7 +434,8 @@ public: explicit TS3UploaderBase( const TActorId& dataShard, ui64 txId, const NKikimrSchemeOp::TBackupTask& task, - TMaybe<Ydb::Table::CreateTableRequest>&& scheme) + TMaybe<Ydb::Table::CreateTableRequest>&& scheme, + TString&& metadata) : ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(task.GetS3Settings())) , Settings(TS3Settings::FromBackupTask(task)) , DataFormat(NBackupRestoreTraits::EDataFormat::Csv) @@ -411,10 +443,12 @@ public: , DataShard(dataShard) , TxId(txId) , Scheme(std::move(scheme)) + , Metadata(std::move(metadata)) , Retries(task.GetNumberOfRetries()) , Attempt(0) , Delay(TDuration::Minutes(1)) , SchemeUploaded(task.GetShardNum() == 0 ? false : true) + , MetadataUploaded(task.GetShardNum() == 0 ? false : true) { } @@ -448,6 +482,14 @@ public: } } + STATEFN(StateUploadMetadata) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadata); + default: + return StateBase(ev); + } + } + STATEFN(StateUploadData) { switch (ev->GetTypeRewrite()) { hFunc(TEvBuffer, Handle); @@ -474,6 +516,7 @@ private: const TActorId DataShard; const ui64 TxId; const TMaybe<Ydb::Table::CreateTableRequest> Scheme; + const TString Metadata; const ui32 Retries; ui32 Attempt; @@ -481,6 +524,7 @@ private: TActorId Client; TDuration Delay; bool SchemeUploaded; + bool MetadataUploaded; bool MultiPart; bool Last; diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index 351544de46..4ba21b3613 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -2,6 +2,8 @@ #include "export_s3_base_uploader.h" +#include "backup_restore_common.h" + namespace NKikimr { namespace NDataShard { @@ -25,7 +27,17 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const { ? GenYdbScheme(Columns, Task.GetTable()) : Nothing(); - return new TS3Uploader(dataShard, txId, Task, std::move(scheme)); + NBackupRestore::TMetadata metadata; + + NBackupRestore::TFullBackupMetadata::TPtr backup = new NBackupRestore::TFullBackupMetadata{ + .SnapshotVts = NBackupRestore::TVirtualTimestamp( + Task.GetSnapshotStep(), + Task.GetSnapshotTxId()) + }; + metadata.AddFullBackup(backup); + + return new TS3Uploader( + dataShard, txId, Task, std::move(scheme), metadata.Serialize()); } } // NDataShard diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h index 35058bb438..0beeb43b83 100644 --- a/ydb/core/tx/datashard/extstorage_usage_config.h +++ b/ydb/core/tx/datashard/extstorage_usage_config.h @@ -41,6 +41,10 @@ public: Aws::S3::Model::StorageClass GetStorageClass() const; + inline TString GetMetadataKey() const { + return NBackupRestoreTraits::MetadataKey(ObjectKeyPattern); + } + inline TString GetSchemeKey() const { return NBackupRestoreTraits::SchemeKey(ObjectKeyPattern); } diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 83c04e7ffa..d54c4c9cc0 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( alter_cdc_stream_unit.cpp alter_table_unit.cpp + backup_restore_common.cpp backup_restore_traits.cpp backup_unit.cpp build_and_wait_dependencies_unit.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h index c7fadcec47..e031beab2e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h @@ -3,6 +3,8 @@ #include "schemeshard__operation_common.h" #include "schemeshard_billing_helpers.h" #include "schemeshard_impl.h" +#include "schemeshard_path_describer.h" +#include "schemeshard_types.h" #include <ydb/core/base/subdomain.h> #include <ydb/core/metering/metering.h> @@ -15,6 +17,7 @@ template <typename TKind> class TConfigurePart: public TSubOperationState { const TTxState::ETxType TxType; const TOperationId OperationId; + const TVirtualTimestamp SnapshotTime; TString DebugHint() const override { return TStringBuilder() @@ -23,9 +26,10 @@ class TConfigurePart: public TSubOperationState { } public: - TConfigurePart(TTxState::ETxType type, TOperationId id) + TConfigurePart(TTxState::ETxType type, TOperationId id, TVirtualTimestamp snapshotTime) : TxType(type) , OperationId(id) + , SnapshotTime(snapshotTime) { IgnoreMessages(DebugHint(), {}); } @@ -55,7 +59,11 @@ public: Y_VERIFY(txState->State == TTxState::ConfigureParts); txState->ClearShardsInProgress(); - TKind::ProposeTx(OperationId, *txState, context); + if constexpr (TKind::NeedSnapshotTime()) { + TKind::ProposeTx(OperationId, *txState, context, SnapshotTime); + } else { + TKind::ProposeTx(OperationId, *txState, context); + } txState->UpdateShardsInProgress(TTxState::ConfigureParts); return false; @@ -444,6 +452,7 @@ template <typename TKind, typename TEvCancel> class TBackupRestoreOperationBase: public TSubOperation { const TTxState::ETxType TxType; const TPathElement::EPathState Lock; + TVirtualTimestamp SnapshotTime; static TTxState::ETxState NextState() { return TTxState::CreateParts; @@ -490,7 +499,7 @@ class TBackupRestoreOperationBase: public TSubOperation { case TTxState::CreateParts: return MakeHolder<TCreateParts>(OperationId); case TTxState::ConfigureParts: - return MakeHolder<TConfigurePart<TKind>>(TxType, OperationId); + return MakeHolder<TConfigurePart<TKind>>(TxType, OperationId, SnapshotTime); case TTxState::Propose: return MakeHolder<TPropose<TKind>>(TxType, OperationId); case TTxState::ProposedWaitParts: @@ -627,6 +636,13 @@ public: return result; } + if constexpr (TKind::NeedSnapshotTime()) { + TPathElement::TPtr targetPath = context.SS->PathsById.at(path.Base()->PathId); + + SnapshotTime.SetStep(targetPath->StepCreated); + SnapshotTime.SetTxId(targetPath->CreateTxId); + } + PrepareChanges(path.Base(), context); SetState(NextState()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp index 8464c4c942..301c7d2aa1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp @@ -9,6 +9,10 @@ struct TBackup { return "TBackup"; } + static constexpr bool NeedSnapshotTime() { + return true; + } + static bool HasTask(const TTxTransaction& tx) { return tx.HasBackup(); } @@ -17,11 +21,13 @@ struct TBackup { return tx.GetBackup().GetTableName(); } - static void ProposeTx(const TOperationId& opId, TTxState& txState, TOperationContext& context) { + static void ProposeTx(const TOperationId& opId, TTxState& txState, TOperationContext& context, TVirtualTimestamp snapshotTime) { const auto& pathId = txState.TargetPathId; Y_VERIFY(context.SS->Tables.contains(pathId)); TTableInfo::TPtr table = context.SS->Tables.at(pathId); NKikimrSchemeOp::TBackupTask backup = table->BackupSettings; + backup.SetSnapshotStep(snapshotTime.Step); + backup.SetSnapshotTxId(snapshotTime.TxId); const auto seqNo = context.SS->StartRound(txState); for (ui32 i = 0; i < txState.Shards.size(); ++i) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp index 88196a65e3..73c1de4bdb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp @@ -9,6 +9,10 @@ struct TRestore { return "TRestore"; } + static constexpr bool NeedSnapshotTime() { + return false; + } + static bool HasTask(const TTxTransaction& tx) { return tx.HasRestore(); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index 62a1c5845a..6043045380 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -109,6 +109,9 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId)); } + task.SetSnapshotStep(exportInfo->SnapshotStep); + task.SetSnapshotTxId(exportInfo->SnapshotTxId); + switch (exportInfo->Kind) { case TExportInfo::EKind::YT: { diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index b1c373f31a..61062967b0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2672,6 +2672,9 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> { TSet<TActorId> Subscribers; + ui64 SnapshotStep = 0; + ui64 SnapshotTxId = 0; + explicit TExportInfo( const ui64 id, const TString& uid, diff --git a/ydb/core/tx/schemeshard/ut_backup.cpp b/ydb/core/tx/schemeshard/ut_backup.cpp index be41e95cfe..e59b0e1592 100644 --- a/ydb/core/tx/schemeshard/ut_backup.cpp +++ b/ydb/core/tx/schemeshard/ut_backup.cpp @@ -135,11 +135,11 @@ Y_UNIT_TEST_SUITE(TBackupTests) { } Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnLargeData) { - ShouldSucceedOnLargeData<Codec>(0, std::make_pair(101, 1)); + ShouldSucceedOnLargeData<Codec>(0, std::make_pair(101, 2)); } Y_UNIT_TEST(ShouldSucceedOnLargeData_MinWriteBatch) { - ShouldSucceedOnLargeData<ECompressionCodec::Zstd>(1 << 20, std::make_pair(0, 2)); + ShouldSucceedOnLargeData<ECompressionCodec::Zstd>(1 << 20, std::make_pair(0, 3)); } } // TBackupTests diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp index 4c17ba3d2a..cc078fc6b1 100644 --- a/ydb/core/tx/schemeshard/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export.cpp @@ -186,15 +186,25 @@ namespace { } // anonymous Y_UNIT_TEST_SUITE(TExportToS3Tests) { - void RunS3(TTestBasicRuntime& runtime, const TVector<TString>& tables, const TString& request) { + void RunS3(TTestBasicRuntime& runtime, const TVector<TString>& tables, const TString& requestTpl) { TPortManager portManager; const ui16 port = portManager.GetPort(); TS3Mock s3Mock({}, TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start()); + auto request = Sprintf(requestTpl.c_str(), port); + TTestEnv env(runtime); - Run(runtime, env, tables, Sprintf(request.c_str(), port), Ydb::StatusIds::SUCCESS, "/MyRoot", false); + Run(runtime, env, tables, request, Ydb::StatusIds::SUCCESS, "/MyRoot", false); + + for (auto &path : GetExportTargetPaths(request)) { + auto canonPath = (path.StartsWith("/") || path.empty()) ? path : TString("/") + path; + auto it = s3Mock.GetData().find(canonPath + "/metadata.json"); + UNIT_ASSERT(it != s3Mock.GetData().end()); + it = s3Mock.GetData().find(canonPath + "/scheme.pb"); + UNIT_ASSERT(it != s3Mock.GetData().end()); + } } Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) { @@ -284,16 +294,6 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { TTestBasicRuntime runtime; TTestEnv env(runtime); - TString scheme; - runtime.SetObserverFunc([&scheme](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { - if (!scheme && ev->GetTypeRewrite() == NWrappers::NExternalStorage::EvPutObjectRequest) { - const auto* msg = ev->Get<NWrappers::NExternalStorage::TEvPutObjectRequest>(); - scheme = msg->Body; - } - - return TTestActorRuntime::EEventAction::PROCESS; - }); - const TVector<TString> tables = {R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } @@ -331,6 +331,11 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { } )", port)); + auto schemeIt = s3Mock.GetData().find("/scheme.pb"); + UNIT_ASSERT(schemeIt != s3Mock.GetData().end()); + + TString scheme = schemeIt->second; + UNIT_ASSERT_NO_DIFF(scheme, R"(columns { name: "key" type { diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index 78538b1614..6327604dc5 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -971,6 +971,19 @@ namespace NSchemeShardUT_Private { CheckExpectedResult(expectedResults, event->Record.GetStatus(), event->Record.GetResult()); } + TVector<TString> GetExportTargetPaths(const TString& requestStr) { + NKikimrExport::TCreateExportRequest request; + UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(requestStr, &request)); + + TVector<TString> result; + + for (auto &item : request.GetExportToS3Settings().items()) { + result.push_back(item.destination_prefix()); + } + + return result; + } + void AsyncExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID) { NKikimrExport::TCreateExportRequest request; UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(requestStr, &request)); diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 57bdfb7e00..83b2b03290 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -345,6 +345,7 @@ namespace NSchemeShardUT_Private { NKikimrIndexBuilder::TEvForgetResponse TestForgetBuildIndex(TTestActorRuntime& runtime, const ui64 id, const ui64 schemeShard, const TString &dbName, const ui64 buildIndexId, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS); ////////// export + TVector<TString> GetExportTargetPaths(const TString& requestStr); void AsyncExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = ""); void AsyncExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = ""); void TestExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", |