diff options
author | Vasily Gerasimov <UgnineSirdis@ydb.tech> | 2025-04-22 13:20:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-22 13:20:37 +0300 |
commit | 63874fb7cb65c52166d07b61c93a3dba2cc80274 (patch) | |
tree | 94deaeb31d1d91e40880a69aa8e8c3aa3fb21181 | |
parent | 73bc54207426e222268e0589263281497de02187 (diff) | |
download | ydb-63874fb7cb65c52166d07b61c93a3dba2cc80274.tar.gz |
Refactor ut_export: move local scope test variables to fixture (#17461)
-rw-r--r-- | ydb/core/tx/schemeshard/ut_export/ut_export.cpp | 1823 |
1 files changed, 820 insertions, 1003 deletions
diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 3974307eda4..fa38ffaba4d 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -156,91 +156,6 @@ namespace { TestGetExport(runtime, schemeshardId, exportId, dbName, Ydb::StatusIds::NOT_FOUND); } - using TDelayFunc = std::function<bool(TAutoPtr<IEventHandle>&)>; - - void Cancel(const TVector<TString>& tables, const TString& request, TDelayFunc delayFunc) { - TTestBasicRuntime runtime; - std::vector<std::string> auditLines; - runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); - - TTestEnv env(runtime); - ui64 txId = 100; - - for (const auto& table : tables) { - TestCreateTable(runtime, ++txId, "/MyRoot", table); - env.TestWaitNotification(runtime, txId); - } - - runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); - - THolder<IEventHandle> delayed; - auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - if (delayFunc(ev)) { - delayed.Reset(ev.Release()); - return TTestActorRuntime::EEventAction::DROP; - } - return TTestActorRuntime::EEventAction::PROCESS; - }); - - TestExport(runtime, ++txId, "/MyRoot", request); - const ui64 exportId = txId; - - // Check audit record for export start - { - auto line = FindAuditLine(auditLines, "operation=EXPORT START"); - UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); - UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT START"); - UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId)); - UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value - UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}"); - UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); - UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); - UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); - UNIT_ASSERT(!line.contains("reason")); - UNIT_ASSERT(!line.contains("start_time")); - UNIT_ASSERT(!line.contains("end_time")); - } - - if (!delayed) { - TDispatchOptions opts; - opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { - return bool(delayed); - }); - runtime.DispatchEvents(opts); - } - - runtime.SetObserverFunc(prevObserver); - - TestCancelExport(runtime, ++txId, "/MyRoot", exportId); - runtime.Send(delayed.Release(), 0, true); - env.TestWaitNotification(runtime, exportId); - - // Check audit record for export end - // - { - auto line = FindAuditLine(auditLines, "operation=EXPORT END"); - UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); - UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT END"); - UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId)); - UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value - UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}"); - UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); - UNIT_ASSERT_STRING_CONTAINS(line, "status=ERROR"); - UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=CANCELLED"); - UNIT_ASSERT_STRING_CONTAINS(line, "reason=Cancelled"); - UNIT_ASSERT_STRING_CONTAINS(line, "start_time="); - UNIT_ASSERT_STRING_CONTAINS(line, "end_time="); - } - - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); - - TestForgetExport(runtime, ++txId, "/MyRoot", exportId); - env.TestWaitNotification(runtime, exportId); - - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); - } - const Ydb::Table::PartitioningSettings& GetPartitioningSettings( const Ydb::Table::CreateTableRequest& tableDescription ) { @@ -295,15 +210,15 @@ namespace { class TExportFixture : public NUnitTest::TBaseFixture { public: - void RunS3(TTestBasicRuntime& runtime, const TVector<TString>& tables, const TString& requestTpl, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) { + void RunS3(const TVector<TString>& tables, const TString& requestTpl, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS, bool checkS3FilesExistence = true) { auto requestStr = Sprintf(requestTpl.c_str(), S3Port()); NKikimrExport::TCreateExportRequest request; UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(requestStr, &request)); - TTestEnv env(runtime); - runtime.GetAppData().FeatureFlags.SetEnableEncryptedExport(true); + Env(); // Init test env + Runtime().GetAppData().FeatureFlags.SetEnableEncryptedExport(true); - Run(runtime, env, tables, requestStr, expectedStatus, "/MyRoot", false); + Run(Runtime(), Env(), tables, requestStr, expectedStatus, "/MyRoot", false); auto calcPath = [&](const TString& targetPath, const TString& file) { TString canonPath = (targetPath.StartsWith("/") || targetPath.empty()) ? targetPath : TString("/") + targetPath; @@ -316,7 +231,7 @@ namespace { return result; }; - if (expectedStatus == Ydb::StatusIds::SUCCESS) { + if (expectedStatus == Ydb::StatusIds::SUCCESS && checkS3FilesExistence) { for (auto& path : GetExportTargetPaths(requestStr)) { UNIT_ASSERT_C(HasS3File(calcPath(path, "metadata.json")), calcPath(path, "metadata.json")); UNIT_ASSERT_C(HasS3File(calcPath(path, "scheme.pb")), calcPath(path, "scheme.pb")); @@ -329,6 +244,29 @@ namespace { return it != S3Mock().GetData().end(); } + template <class T> + void CheckHasAllS3Files(std::initializer_list<T> paths) { + for (const T& path : paths) { + UNIT_ASSERT_C(HasS3File(path), "Path \"" << path << "\" is expected to exist in S3"); + } + } + + template <class T> + void CheckNoSuchS3Files(std::initializer_list<T> paths) { + for (const T& path : paths) { + UNIT_ASSERT_C(!HasS3File(path), "Path \"" << path << "\" is expected not to exist in S3"); + } + } + + template <class T> + void CheckNoS3Prefix(std::initializer_list<T> prefixes) { + for (const T& prefix : prefixes) { + for (auto&& [fileName, _] : S3Mock().GetData()) { + UNIT_ASSERT_C(!fileName.StartsWith(prefix), "S3 path \"" << fileName << "\" has prefix \"" << prefix << "\", which is not expected prefix"); + } + } + } + TString GetS3FileContent(const TString& path) { auto it = S3Mock().GetData().find(path); if (it != S3Mock().GetData().end()) { @@ -344,11 +282,350 @@ namespace { } } + using TDelayFunc = std::function<bool(TAutoPtr<IEventHandle>&)>; + + void Cancel(const TVector<TString>& tables, const TString& request, TDelayFunc delayFunc) { + std::vector<std::string> auditLines; + Runtime().AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + + Env(); // Init test env + ui64 txId = 100; + + for (const auto& table : tables) { + TestCreateTable(Runtime(), ++txId, "/MyRoot", table); + Env().TestWaitNotification(Runtime(), txId); + } + + Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); + Runtime().SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); + + THolder<IEventHandle> delayed; + auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + if (delayFunc(ev)) { + delayed.Reset(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + TestExport(Runtime(), ++txId, "/MyRoot", request); + const ui64 exportId = txId; + + // Check audit record for export start + { + auto line = FindAuditLine(auditLines, "operation=EXPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId)); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value + UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT(!line.contains("start_time")); + UNIT_ASSERT(!line.contains("end_time")); + } + + if (!delayed) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { + return bool(delayed); + }); + Runtime().DispatchEvents(opts); + } + + Runtime().SetObserverFunc(prevObserver); + + TestCancelExport(Runtime(), ++txId, "/MyRoot", exportId); + Runtime().Send(delayed.Release(), 0, true); + Env().TestWaitNotification(Runtime(), exportId); + + // Check audit record for export end + // + { + auto line = FindAuditLine(auditLines, "operation=EXPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId)); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value + UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=ERROR"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=CANCELLED"); + UNIT_ASSERT_STRING_CONTAINS(line, "reason=Cancelled"); + UNIT_ASSERT_STRING_CONTAINS(line, "start_time="); + UNIT_ASSERT_STRING_CONTAINS(line, "end_time="); + } + + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); + + TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId); + Env().TestWaitNotification(Runtime(), exportId); + + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + } + + void CancelUponTransferringShouldSucceed(const TVector<TString>& tables, const TString& request) { + Cancel(tables, Sprintf(request.c_str(), S3Port()), [](TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) { + return false; + } + + return ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record + .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpBackup; + }); + } + + void CancelShouldSucceed(TDelayFunc delayFunc) { + Cancel({ + R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + }, Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", S3Port()), delayFunc); + } + + void DropCopiesBeforeTransferring(ui32 tablesCount) { + Env(); // Init test env + ui64 txId = 100; + + for (ui32 i = 1; i <= tablesCount; ++i) { + TestCreateTable(Runtime(), ++txId, "/MyRoot", Sprintf(R"( + Name: "Table%d" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", i)); + Env().TestWaitNotification(Runtime(), txId); + } + + Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); + Runtime().SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); + + bool dropNotification = false; + THolder<IEventHandle> delayed; + auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvSchemeShard::EvModifySchemeTransaction: + break; + case TEvSchemeShard::EvNotifyTxCompletionResult: + if (dropNotification) { + delayed.Reset(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::EEventAction::PROCESS; + default: + return TTestActorRuntime::EEventAction::PROCESS; + } + + const auto* msg = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>(); + if (msg->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) { + dropNotification = true; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + TStringBuilder items; + for (ui32 i = 1; i <= tablesCount; ++i) { + items << "items {" + << " source_path: \"/MyRoot/Table" << i << "\"" + << " destination_prefix: \"\"" + << " }"; + } + + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + %s + } + )", S3Port(), items.c_str())); + const ui64 exportId = txId; + + if (!delayed) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { + return bool(delayed); + }); + Runtime().DispatchEvents(opts); + } + + Runtime().SetObserverFunc(prevObserver); + + for (ui32 i = 0; i < tablesCount; ++i) { + TestDropTable(Runtime(), ++txId, Sprintf("/MyRoot/export-%" PRIu64, exportId), ToString(i)); + Env().TestWaitNotification(Runtime(), txId); + } + + Runtime().Send(delayed.Release(), 0, true); + Env().TestWaitNotification(Runtime(), exportId); + + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); + + TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId); + Env().TestWaitNotification(Runtime(), exportId); + + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + } + + void RebootDuringFinish(bool rejectUploadParts, Ydb::StatusIds::StatusCode expectedStatus) { + S3Settings().WithRejectUploadParts(rejectUploadParts); + + Env(); // Init test env + ui64 txId = 100; + + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + Env().TestWaitNotification(Runtime(), txId); + + UpdateRow(Runtime(), "Table", 1, "valueA"); + UpdateRow(Runtime(), "Table", 2, "valueB"); + + Runtime().SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE); + Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); + Runtime().SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); + + TMaybe<ui64> backupTxId; + TMaybe<ui64> tabletId; + bool delayed = false; + + auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvProposeTransaction: { + auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record; + if (record.GetTxKind() != NKikimrTxDataShard::ETransactionKind::TX_KIND_SCHEME) { + return TTestActorRuntime::EEventAction::PROCESS; + } + + NKikimrTxDataShard::TFlatSchemeTransaction schemeTx; + UNIT_ASSERT(schemeTx.ParseFromString(record.GetTxBody())); + + if (schemeTx.HasBackup()) { + backupTxId = record.GetTxId(); + // hijack + schemeTx.MutableBackup()->MutableScanSettings()->SetRowsBatchSize(1); + record.SetTxBody(schemeTx.SerializeAsString()); + } + + return TTestActorRuntime::EEventAction::PROCESS; + } + + case TEvDataShard::EvProposeTransactionResult: { + if (!backupTxId) { + return TTestActorRuntime::EEventAction::PROCESS; + } + + const auto& record = ev->Get<TEvDataShard::TEvProposeTransactionResult>()->Record; + if (record.GetTxId() != *backupTxId) { + return TTestActorRuntime::EEventAction::PROCESS; + } + + tabletId = record.GetOrigin(); + return TTestActorRuntime::EEventAction::PROCESS; + } + + case NWrappers::NExternalStorage::EvCompleteMultipartUploadRequest: + case NWrappers::NExternalStorage::EvAbortMultipartUploadRequest: + delayed = true; + return TTestActorRuntime::EEventAction::DROP; + + default: + return TTestActorRuntime::EEventAction::PROCESS; + } + }); + + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", S3Port())); + const ui64 exportId = txId; + + if (!delayed || !tabletId) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed, &tabletId](IEventHandle&) -> bool { + return delayed && tabletId; + }); + Runtime().DispatchEvents(opts); + } + + Runtime().SetObserverFunc(prevObserver); + + RebootTablet(Runtime(), *tabletId, Runtime().AllocateEdgeActor()); + Env().TestWaitNotification(Runtime(), exportId); + + TestGetExport(Runtime(), exportId, "/MyRoot", expectedStatus); + + TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId); + Env().TestWaitNotification(Runtime(), exportId); + + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + } + + void ShouldCheckQuotas(const TSchemeLimits& limits, Ydb::StatusIds::StatusCode expectedFailStatus) { + const TString userSID = "user@builtin"; + EnvOptions().SystemBackupSIDs({userSID}); + Env(); // Init test env + + SetSchemeshardSchemaLimits(Runtime(), limits); + + const TVector<TString> tables = { + R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + }; + const TString request = Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", S3Port()); + + Run(Runtime(), Env(), tables, request, expectedFailStatus); + Run(Runtime(), Env(), tables, request, Ydb::StatusIds::SUCCESS, "/MyRoot", false, userSID); + } + protected: + TS3Mock::TSettings& S3Settings() { + if (!S3ServerSettings) { + S3ServerPort = PortManager.GetPort(); + S3ServerSettings.ConstructInPlace(S3ServerPort); + } + return *S3ServerSettings; + } + TS3Mock& S3Mock() { if (!S3ServerMock) { - S3ServerPort = PortManager.GetPort(); - S3ServerMock.ConstructInPlace(TS3Mock::TSettings(S3ServerPort)); + S3ServerMock.ConstructInPlace(S3Settings()); UNIT_ASSERT(S3ServerMock->Start()); } return *S3ServerMock; @@ -359,19 +636,42 @@ namespace { return S3ServerPort; } + TTestBasicRuntime& Runtime() { + if (!TestRuntime) { + TestRuntime.ConstructInPlace(); + } + return *TestRuntime; + } + + TTestEnvOptions& EnvOptions() { + if (!TestEnvOptions) { + TestEnvOptions.ConstructInPlace(); + } + return *TestEnvOptions; + } + + TTestEnv& Env() { + if (!TestEnv) { + TestEnv.ConstructInPlace(Runtime(), EnvOptions()); + } + return *TestEnv; + } + private: TPortManager PortManager; ui16 S3ServerPort = 0; + TMaybe<TTestBasicRuntime> TestRuntime; + TMaybe<TS3Mock::TSettings> S3ServerSettings; TMaybe<TS3Mock> S3ServerMock; + TMaybe<TTestEnvOptions> TestEnvOptions; + TMaybe<TTestEnv> TestEnv; }; } // anonymous Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) { Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) { - TTestBasicRuntime runtime; - - RunS3(runtime, { + RunS3({ R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } @@ -391,9 +691,7 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) { } Y_UNIT_TEST(ShouldSucceedOnMultiShardTable) { - TTestBasicRuntime runtime; - - RunS3(runtime, { + RunS3({ R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } @@ -414,9 +712,7 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) { } Y_UNIT_TEST(ShouldSucceedOnManyTables) { - TTestBasicRuntime runtime; - - RunS3(runtime, { + RunS3({ R"( Name: "Table1" Columns { Name: "key" Type: "Utf8" } @@ -446,52 +742,43 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) { } Y_UNIT_TEST(ShouldOmitNonStrictStorageSettings) { - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TTestBasicRuntime runtime; - TTestEnv env(runtime); - const TVector<TString> tables = {R"( Name: "Table" Columns { - Name: "key" - Type: "Utf8" - DefaultFromLiteral { - type { - optional_type { - item { - type_id: UTF8 - } - } - } - value { - items { - text_value: "b" - } + Name: "key" + Type: "Utf8" + DefaultFromLiteral { + type { + optional_type { + item { + type_id: UTF8 } + } + } + value { + items { + text_value: "b" + } } + } } Columns { - Name: "value" - Type: "Utf8" - DefaultFromLiteral { - type { - optional_type { - item { - type_id: UTF8 - } - } - } - value { - items { - text_value: "a" - } + Name: "value" + Type: "Utf8" + DefaultFromLiteral { + type { + optional_type { + item { + type_id: UTF8 } + } } + value { + items { + text_value: "a" + } + } + } } KeyColumnNames: ["key"] PartitionConfig { @@ -515,7 +802,7 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) { } )"}; - Run(runtime, env, tables, Sprintf(R"( + Run(Runtime(), Env(), tables, Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -524,10 +811,10 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) { destination_prefix: "" } } - )", port)); + )", S3Port())); - auto schemeIt = s3Mock.GetData().find("/scheme.pb"); - UNIT_ASSERT(schemeIt != s3Mock.GetData().end()); + auto schemeIt = S3Mock().GetData().find("/scheme.pb"); + UNIT_ASSERT(schemeIt != S3Mock().GetData().end()); TString scheme = schemeIt->second; @@ -596,54 +883,45 @@ partitioning_settings { } Y_UNIT_TEST(ShouldPreserveIncrBackupFlag) { - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TTestBasicRuntime runtime; - TTestEnv env(runtime); - const TTablesWithAttrs tables{ { R"( Name: "Table" Columns { - Name: "key" - Type: "Utf8" - DefaultFromLiteral { - type { - optional_type { - item { - type_id: UTF8 - } - } - } - value { - items { - text_value: "b" - } + Name: "key" + Type: "Utf8" + DefaultFromLiteral { + type { + optional_type { + item { + type_id: UTF8 } + } } + value { + items { + text_value: "b" + } + } + } } Columns { - Name: "value" - Type: "Utf8" - DefaultFromLiteral { - type { - optional_type { - item { - type_id: UTF8 - } - } - } - value { - items { - text_value: "a" - } + Name: "value" + Type: "Utf8" + DefaultFromLiteral { + type { + optional_type { + item { + type_id: UTF8 } + } } + value { + items { + text_value: "a" + } + } + } } KeyColumnNames: ["key"] )", @@ -651,7 +929,7 @@ partitioning_settings { }, }; - Run(runtime, env, tables, Sprintf(R"( + Run(Runtime(), Env(), tables, Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -660,10 +938,10 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); - auto schemeIt = s3Mock.GetData().find("/scheme.pb"); - UNIT_ASSERT(schemeIt != s3Mock.GetData().end()); + auto schemeIt = S3Mock().GetData().find("/scheme.pb"); + UNIT_ASSERT(schemeIt != S3Mock().GetData().end()); TString scheme = schemeIt->second; @@ -728,32 +1006,6 @@ partitioning_settings { )"); } - void CancelShouldSucceed(TDelayFunc delayFunc) { - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - Cancel({ - R"( - Name: "Table" - Columns { Name: "key" Type: "Utf8" } - Columns { Name: "value" Type: "Utf8" } - KeyColumnNames: ["key"] - )", - }, Sprintf(R"( - ExportToS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_path: "/MyRoot/Table" - destination_prefix: "" - } - } - )", port), delayFunc); - } - Y_UNIT_TEST(CancelUponCreatingExportDirShouldSucceed) { CancelShouldSucceed([](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) { @@ -776,23 +1028,6 @@ partitioning_settings { }); } - void CancelUponTransferringShouldSucceed(const TVector<TString>& tables, const TString& request) { - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - Cancel(tables, Sprintf(request.c_str(), port), [](TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) { - return false; - } - - return ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record - .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpBackup; - }); - } - Y_UNIT_TEST(CancelUponTransferringSingleShardTableShouldSucceed) { CancelUponTransferringShouldSucceed({ R"( @@ -869,24 +1104,23 @@ partitioning_settings { } Y_UNIT_TEST(DropSourceTableBeforeTransferring) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); - runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); + Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); + Runtime().SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); bool dropNotification = false; THolder<IEventHandle> delayed; - auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { case TEvSchemeShard::EvModifySchemeTransaction: break; @@ -908,13 +1142,7 @@ partitioning_settings { return TTestActorRuntime::EEventAction::PROCESS; }); - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -923,96 +1151,7 @@ partitioning_settings { destination_prefix: "" } } - )", port)); - const ui64 exportId = txId; - - if (!delayed) { - TDispatchOptions opts; - opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { - return bool(delayed); - }); - runtime.DispatchEvents(opts); - } - - runtime.SetObserverFunc(prevObserver); - - TestDropTable(runtime, ++txId, "/MyRoot", "Table"); - env.TestWaitNotification(runtime, txId); - - runtime.Send(delayed.Release(), 0, true); - env.TestWaitNotification(runtime, exportId); - - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); - - TestForgetExport(runtime, ++txId, "/MyRoot", exportId); - env.TestWaitNotification(runtime, exportId); - - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); - } - - void DropCopiesBeforeTransferring(ui32 tablesCount) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); - ui64 txId = 100; - - for (ui32 i = 1; i <= tablesCount; ++i) { - TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(R"( - Name: "Table%d" - Columns { Name: "key" Type: "Utf8" } - Columns { Name: "value" Type: "Utf8" } - KeyColumnNames: ["key"] - )", i)); - env.TestWaitNotification(runtime, txId); - } - - runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); - - bool dropNotification = false; - THolder<IEventHandle> delayed; - auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - switch (ev->GetTypeRewrite()) { - case TEvSchemeShard::EvModifySchemeTransaction: - break; - case TEvSchemeShard::EvNotifyTxCompletionResult: - if (dropNotification) { - delayed.Reset(ev.Release()); - return TTestActorRuntime::EEventAction::DROP; - } - return TTestActorRuntime::EEventAction::PROCESS; - default: - return TTestActorRuntime::EEventAction::PROCESS; - } - - const auto* msg = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>(); - if (msg->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) { - dropNotification = true; - } - - return TTestActorRuntime::EEventAction::PROCESS; - }); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TStringBuilder items; - for (ui32 i = 1; i <= tablesCount; ++i) { - items << "items {" - << " source_path: \"/MyRoot/Table" << i << "\"" - << " destination_prefix: \"\"" - << " }"; - } - - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( - ExportToS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - %s - } - )", port, items.c_str())); + )", S3Port())); const ui64 exportId = txId; if (!delayed) { @@ -1020,25 +1159,23 @@ partitioning_settings { opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { return bool(delayed); }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } - runtime.SetObserverFunc(prevObserver); + Runtime().SetObserverFunc(prevObserver); - for (ui32 i = 0; i < tablesCount; ++i) { - TestDropTable(runtime, ++txId, Sprintf("/MyRoot/export-%" PRIu64, exportId), ToString(i)); - env.TestWaitNotification(runtime, txId); - } + TestDropTable(Runtime(), ++txId, "/MyRoot", "Table"); + Env().TestWaitNotification(Runtime(), txId); - runtime.Send(delayed.Release(), 0, true); - env.TestWaitNotification(runtime, exportId); + Runtime().Send(delayed.Release(), 0, true); + Env().TestWaitNotification(Runtime(), exportId); - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); - TestForgetExport(runtime, ++txId, "/MyRoot", exportId); - env.TestWaitNotification(runtime, exportId); + TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId); + Env().TestWaitNotification(Runtime(), exportId); - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); } Y_UNIT_TEST(DropCopiesBeforeTransferring1) { @@ -1049,114 +1186,6 @@ partitioning_settings { DropCopiesBeforeTransferring(2); } - void RebootDuringFinish(bool rejectUploadParts, Ydb::StatusIds::StatusCode expectedStatus) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); - ui64 txId = 100; - - TestCreateTable(runtime, ++txId, "/MyRoot", R"( - Name: "Table" - Columns { Name: "key" Type: "Uint32" } - Columns { Name: "value" Type: "Utf8" } - KeyColumnNames: ["key"] - )"); - env.TestWaitNotification(runtime, txId); - - UpdateRow(runtime, "Table", 1, "valueA"); - UpdateRow(runtime, "Table", 2, "valueB"); - - runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); - - TMaybe<ui64> backupTxId; - TMaybe<ui64> tabletId; - bool delayed = false; - - auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - switch (ev->GetTypeRewrite()) { - case TEvDataShard::EvProposeTransaction: { - auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record; - if (record.GetTxKind() != NKikimrTxDataShard::ETransactionKind::TX_KIND_SCHEME) { - return TTestActorRuntime::EEventAction::PROCESS; - } - - NKikimrTxDataShard::TFlatSchemeTransaction schemeTx; - UNIT_ASSERT(schemeTx.ParseFromString(record.GetTxBody())); - - if (schemeTx.HasBackup()) { - backupTxId = record.GetTxId(); - // hijack - schemeTx.MutableBackup()->MutableScanSettings()->SetRowsBatchSize(1); - record.SetTxBody(schemeTx.SerializeAsString()); - } - - return TTestActorRuntime::EEventAction::PROCESS; - } - - case TEvDataShard::EvProposeTransactionResult: { - if (!backupTxId) { - return TTestActorRuntime::EEventAction::PROCESS; - } - - const auto& record = ev->Get<TEvDataShard::TEvProposeTransactionResult>()->Record; - if (record.GetTxId() != *backupTxId) { - return TTestActorRuntime::EEventAction::PROCESS; - } - - tabletId = record.GetOrigin(); - return TTestActorRuntime::EEventAction::PROCESS; - } - - case NWrappers::NExternalStorage::EvCompleteMultipartUploadRequest: - case NWrappers::NExternalStorage::EvAbortMultipartUploadRequest: - delayed = true; - return TTestActorRuntime::EEventAction::DROP; - - default: - return TTestActorRuntime::EEventAction::PROCESS; - } - }); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port).WithRejectUploadParts(rejectUploadParts)); - UNIT_ASSERT(s3Mock.Start()); - - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( - ExportToS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_path: "/MyRoot/Table" - destination_prefix: "" - } - } - )", port)); - const ui64 exportId = txId; - - if (!delayed || !tabletId) { - TDispatchOptions opts; - opts.FinalEvents.emplace_back([&delayed, &tabletId](IEventHandle&) -> bool { - return delayed && tabletId; - }); - runtime.DispatchEvents(opts); - } - - runtime.SetObserverFunc(prevObserver); - - RebootTablet(runtime, *tabletId, runtime.AllocateEdgeActor()); - env.TestWaitNotification(runtime, exportId); - - TestGetExport(runtime, exportId, "/MyRoot", expectedStatus); - - TestForgetExport(runtime, ++txId, "/MyRoot", exportId); - env.TestWaitNotification(runtime, exportId); - - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); - } - Y_UNIT_TEST(RebootDuringCompletion) { RebootDuringFinish(false, Ydb::StatusIds::SUCCESS); } @@ -1166,12 +1195,12 @@ partitioning_settings { } Y_UNIT_TEST(ShouldExcludeBackupTableFromStats) { - TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true)); + EnvOptions().DisableStatsBatching(true); + Env(); // Init test env ui64 txId = 100; THashSet<ui64> statsCollected; - runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == TEvDataShard::EvPeriodicTableStats) { statsCollected.insert(ev->Get<TEvDataShard::TEvPeriodicTableStats>()->Record.GetDatashardId()); } @@ -1186,41 +1215,35 @@ partitioning_settings { opts.FinalEvents.emplace_back([&](IEventHandle&) -> bool { return statsCollected.size() == count; }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); - return DescribePath(runtime, "/MyRoot") + return DescribePath(Runtime(), "/MyRoot") .GetPathDescription() .GetDomainDescription() .GetDiskSpaceUsage(); }; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); for (int i = 1; i < 500; ++i) { - UpdateRow(runtime, "Table", i, "value"); + UpdateRow(Runtime(), "Table", i, "value"); } // trigger memtable's compaction - TestCopyTable(runtime, ++txId, "/MyRoot", "CopyTable", "/MyRoot/Table"); - env.TestWaitNotification(runtime, txId); - TestDropTable(runtime, ++txId, "/MyRoot", "Table"); - env.TestWaitNotification(runtime, txId); + TestCopyTable(Runtime(), ++txId, "/MyRoot", "CopyTable", "/MyRoot/Table"); + Env().TestWaitNotification(Runtime(), txId); + TestDropTable(Runtime(), ++txId, "/MyRoot", "Table"); + Env().TestWaitNotification(Runtime(), txId); const auto expected = waitForStats(1); - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1229,15 +1252,15 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); const ui64 exportId = txId; ::NKikimrSubDomains::TDiskSpaceUsage afterExport; TTestActorRuntime::TEventObserver prevObserverFunc; - prevObserverFunc = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) { + prevObserverFunc = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& event) { if (auto* p = event->CastAsLocal<TEvSchemeShard::TEvModifySchemeTransaction>()) { auto& record = p->Record; - if (record.TransactionSize() >= 1 && + if (record.TransactionSize() >= 1 && record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpDropTable) { afterExport = waitForStats(2); } @@ -1245,50 +1268,43 @@ partitioning_settings { return prevObserverFunc(event); }); - env.TestWaitNotification(runtime, exportId); + Env().TestWaitNotification(Runtime(), exportId); - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); UNIT_ASSERT_STRINGS_EQUAL(expected.DebugString(), afterExport.DebugString()); - TestForgetExport(runtime, ++txId, "/MyRoot", exportId); - env.TestWaitNotification(runtime, exportId); + TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId); + Env().TestWaitNotification(Runtime(), exportId); - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); const auto afterForget = waitForStats(1); UNIT_ASSERT_STRINGS_EQUAL(expected.DebugString(), afterForget.DebugString()); } Y_UNIT_TEST(CheckItemProgress) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TBlockEvents<NKikimr::NWrappers::NExternalStorage::TEvPutObjectRequest> blockPartition01(runtime, [](auto&& ev) { + TBlockEvents<NKikimr::NWrappers::NExternalStorage::TEvPutObjectRequest> blockPartition01(Runtime(), [](auto&& ev) { return ev->Get()->Request.GetKey() == "/data_01.csv"; }); - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 10 } }}} )"); - env.TestWaitNotification(runtime, txId); - - WriteRow(runtime, ++txId, "/MyRoot/Table", 0, 1, "v1"); - env.TestWaitNotification(runtime, txId); - WriteRow(runtime, ++txId, "/MyRoot/Table", 1, 100, "v100"); - env.TestWaitNotification(runtime, txId); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); + Env().TestWaitNotification(Runtime(), txId); - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + WriteRow(Runtime(), ++txId, "/MyRoot/Table", 0, 1, "v1"); + Env().TestWaitNotification(Runtime(), txId); + WriteRow(Runtime(), ++txId, "/MyRoot/Table", 1, 100, "v100"); + Env().TestWaitNotification(Runtime(), txId); - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1297,14 +1313,13 @@ partitioning_settings { destination_prefix: "" } } - )", port)); - + )", S3Port())); - runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition01.size() >= 1; }); + Runtime().WaitFor("put object request from 01 partition", [&]{ return blockPartition01.size() >= 1; }); bool isCompleted = false; while (!isCompleted) { - const auto desc = TestGetExport(runtime, txId, "/MyRoot"); + const auto desc = TestGetExport(Runtime(), txId, "/MyRoot"); const auto entry = desc.GetResponse().GetEntry(); const auto& item = entry.GetItemsProgress(0); @@ -1314,44 +1329,37 @@ partitioning_settings { UNIT_ASSERT_VALUES_EQUAL(item.parts_completed(), 1); UNIT_ASSERT(item.has_start_time()); } else { - runtime.SimulateSleep(TDuration::Seconds(1)); + Runtime().SimulateSleep(TDuration::Seconds(1)); } } blockPartition01.Stop(); blockPartition01.Unblock(); - - env.TestWaitNotification(runtime, txId); - const auto desc = TestGetExport(runtime, txId, "/MyRoot"); + Env().TestWaitNotification(Runtime(), txId); + + const auto desc = TestGetExport(Runtime(), txId, "/MyRoot"); const auto entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.ItemsProgressSize(), 1); } Y_UNIT_TEST(ShouldRestartOnScanErrors) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - UpdateRow(runtime, "Table", 1, "valueA"); + Env().TestWaitNotification(Runtime(), txId); - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + UpdateRow(Runtime(), "Table", 1, "valueA"); THolder<IEventHandle> injectResult; - auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == NSharedCache::EvResult) { const auto* msg = ev->Get<NSharedCache::TEvResult>(); UNIT_ASSERT_VALUES_EQUAL(msg->Status, NKikimrProto::OK); @@ -1366,7 +1374,7 @@ partitioning_settings { return TTestActorRuntime::EEventAction::PROCESS; }); - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1375,44 +1383,37 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); if (!injectResult) { TDispatchOptions opts; opts.FinalEvents.emplace_back([&injectResult](IEventHandle&) -> bool { return bool(injectResult); }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } - runtime.SetObserverFunc(prevObserver); - runtime.Send(injectResult.Release(), 0, true); + Runtime().SetObserverFunc(prevObserver); + Runtime().Send(injectResult.Release(), 0, true); - env.TestWaitNotification(runtime, txId); - TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS); + Env().TestWaitNotification(Runtime(), txId); + TestGetExport(Runtime(), txId, "/MyRoot", Ydb::StatusIds::SUCCESS); } Y_UNIT_TEST(ShouldSucceedOnConcurrentTxs) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + Env().TestWaitNotification(Runtime(), txId); THolder<IEventHandle> copyTables; - auto origObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + auto origObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) { const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record; if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) { @@ -1424,7 +1425,7 @@ partitioning_settings { }); const auto exportId = ++txId; - TestExport(runtime, exportId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), exportId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1433,18 +1434,18 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); if (!copyTables) { TDispatchOptions opts; opts.FinalEvents.emplace_back([©Tables](IEventHandle&) -> bool { return bool(copyTables); }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } THolder<IEventHandle> proposeTxResult; - runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == TEvDataShard::EvProposeTransactionResult) { proposeTxResult.Reset(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -1452,7 +1453,7 @@ partitioning_settings { return TTestActorRuntime::EEventAction::PROCESS; }); - TestAlterTable(runtime, ++txId, "/MyRoot", R"( + TestAlterTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "extra" Type: "Utf8"} )"); @@ -1462,39 +1463,32 @@ partitioning_settings { opts.FinalEvents.emplace_back([&proposeTxResult](IEventHandle&) -> bool { return bool(proposeTxResult); }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } - runtime.SetObserverFunc(origObserver); - runtime.Send(copyTables.Release(), 0, true); - runtime.Send(proposeTxResult.Release(), 0, true); - env.TestWaitNotification(runtime, txId); + Runtime().SetObserverFunc(origObserver); + Runtime().Send(copyTables.Release(), 0, true); + Runtime().Send(proposeTxResult.Release(), 0, true); + Env().TestWaitNotification(Runtime(), txId); - env.TestWaitNotification(runtime, exportId); - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); + Env().TestWaitNotification(Runtime(), exportId); + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); } Y_UNIT_TEST(ShouldSucceedOnConcurrentExport) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + Env().TestWaitNotification(Runtime(), txId); TVector<THolder<IEventHandle>> copyTables; - auto origObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + auto origObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) { const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record; if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) { @@ -1504,20 +1498,20 @@ partitioning_settings { } return TTestActorRuntime::EEventAction::PROCESS; }); - auto waitCopyTables = [&runtime, ©Tables](ui32 size) { + auto waitCopyTables = [this, ©Tables](ui32 size) { if (copyTables.size() != size) { TDispatchOptions opts; opts.FinalEvents.emplace_back([©Tables, size](IEventHandle&) -> bool { return copyTables.size() == size; }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } }; TVector<ui64> exportIds; for (ui32 i = 1; i <= 3; ++i) { exportIds.push_back(++txId); - TestExport(runtime, exportIds[i - 1], "/MyRoot", Sprintf(R"( + TestExport(Runtime(), exportIds[i - 1], "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1526,42 +1520,35 @@ partitioning_settings { destination_prefix: "Table%u" } } - )", port, i)); + )", S3Port(), i)); waitCopyTables(i); } - runtime.SetObserverFunc(origObserver); + Runtime().SetObserverFunc(origObserver); for (auto& ev : copyTables) { - runtime.Send(ev.Release(), 0, true); + Runtime().Send(ev.Release(), 0, true); } for (ui64 exportId : exportIds) { - env.TestWaitNotification(runtime, exportId); - TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); + Env().TestWaitNotification(Runtime(), exportId); + TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); } } Y_UNIT_TEST(ShouldSucceedOnConcurrentImport) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + Env().TestWaitNotification(Runtime(), txId); // prepare backup data - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1570,12 +1557,12 @@ partitioning_settings { destination_prefix: "Backup1" } } - )", port)); - env.TestWaitNotification(runtime, txId); - TestGetExport(runtime, txId, "/MyRoot"); + )", S3Port())); + Env().TestWaitNotification(Runtime(), txId); + TestGetExport(Runtime(), txId, "/MyRoot"); TVector<THolder<IEventHandle>> delayed; - auto origObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + auto origObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) { const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record; const auto opType = record.GetTransaction(0).GetOperationType(); @@ -1591,18 +1578,18 @@ partitioning_settings { return TTestActorRuntime::EEventAction::PROCESS; }); - auto waitForDelayed = [&runtime, &delayed](ui32 size) { + auto waitForDelayed = [this, &delayed](ui32 size) { if (delayed.size() != size) { TDispatchOptions opts; opts.FinalEvents.emplace_back([&delayed, size](IEventHandle&) -> bool { return delayed.size() == size; }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } }; const auto importId = ++txId; - TestImport(runtime, importId, "/MyRoot", Sprintf(R"( + TestImport(Runtime(), importId, "/MyRoot", Sprintf(R"( ImportFromS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1611,12 +1598,12 @@ partitioning_settings { destination_path: "/MyRoot/Restored" } } - )", port)); + )", S3Port())); // wait for restore op waitForDelayed(1); const auto exportId = ++txId; - TestExport(runtime, exportId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), exportId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1625,87 +1612,47 @@ partitioning_settings { destination_prefix: "Backup2" } } - )", port)); + )", S3Port())); // wait for copy table op waitForDelayed(2); - runtime.SetObserverFunc(origObserver); + Runtime().SetObserverFunc(origObserver); for (auto& ev : delayed) { - runtime.Send(ev.Release(), 0, true); + Runtime().Send(ev.Release(), 0, true); } - env.TestWaitNotification(runtime, importId); - TestGetImport(runtime, importId, "/MyRoot"); - env.TestWaitNotification(runtime, exportId); - TestGetExport(runtime, exportId, "/MyRoot"); + Env().TestWaitNotification(Runtime(), importId); + TestGetImport(Runtime(), importId, "/MyRoot"); + Env().TestWaitNotification(Runtime(), exportId); + TestGetExport(Runtime(), exportId, "/MyRoot"); } - void ShouldCheckQuotas(const TSchemeLimits& limits, Ydb::StatusIds::StatusCode expectedFailStatus) { - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - const TString userSID = "user@builtin"; - TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().SystemBackupSIDs({userSID})); - - SetSchemeshardSchemaLimits(runtime, limits); - - const TVector<TString> tables = { - R"( - Name: "Table" - Columns { Name: "key" Type: "Utf8" } - Columns { Name: "value" Type: "Utf8" } - KeyColumnNames: ["key"] - )", - }; - const TString request = Sprintf(R"( - ExportToS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_path: "/MyRoot/Table" - destination_prefix: "" - } - } - )", port); - - Run(runtime, env, tables, request, expectedFailStatus); - Run(runtime, env, tables, request, Ydb::StatusIds::SUCCESS, "/MyRoot", false, userSID); + Y_UNIT_TEST(ShouldCheckQuotasExportsLimited) { + ShouldCheckQuotas(TSchemeLimits{.MaxExports = 0}, Ydb::StatusIds::PRECONDITION_FAILED); } - Y_UNIT_TEST(ShouldCheckQuotas) { - ShouldCheckQuotas(TSchemeLimits{.MaxExports = 0}, Ydb::StatusIds::PRECONDITION_FAILED); + Y_UNIT_TEST(ShouldCheckQuotasChildrenLimited) { ShouldCheckQuotas(TSchemeLimits{.MaxChildrenInDir = 1}, Ydb::StatusIds::CANCELLED); } Y_UNIT_TEST(ShouldRetryAtFinalStage) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - UpdateRow(runtime, "Table", 1, "valueA"); - UpdateRow(runtime, "Table", 2, "valueB"); - runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); + Env().TestWaitNotification(Runtime(), txId); - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + UpdateRow(Runtime(), "Table", 1, "valueA"); + UpdateRow(Runtime(), "Table", 2, "valueB"); + Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG); THolder<IEventHandle> injectResult; - auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { case TEvDataShard::EvProposeTransaction: { auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record; @@ -1742,7 +1689,7 @@ partitioning_settings { }); const auto exportId = ++txId; - TestExport(runtime, txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1752,78 +1699,65 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); if (!injectResult) { TDispatchOptions opts; opts.FinalEvents.emplace_back([&injectResult](IEventHandle&) -> bool { return bool(injectResult); }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } - runtime.SetObserverFunc(prevObserver); - runtime.Send(injectResult.Release(), 0, true); + Runtime().SetObserverFunc(prevObserver); + Runtime().Send(injectResult.Release(), 0, true); - env.TestWaitNotification(runtime, exportId); - TestGetExport(runtime, exportId, "/MyRoot"); + Env().TestWaitNotification(Runtime(), exportId); + TestGetExport(Runtime(), exportId, "/MyRoot"); } Y_UNIT_TEST(CorruptedDyNumber) { - TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true)); + EnvOptions().DisableStatsBatching(true); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "DyNumber" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); // Write bad DyNumber - UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); + UploadRow(Runtime(), "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( - ExportToS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_path: "/MyRoot/Table" - destination_prefix: "" - } - } - )", port)); - env.TestWaitNotification(runtime, txId); + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", S3Port())); + Env().TestWaitNotification(Runtime(), txId); - TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED); + TestGetExport(Runtime(), txId, "/MyRoot", Ydb::StatusIds::CANCELLED); } Y_UNIT_TEST(UidAsIdempotencyKey) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + Env().TestWaitNotification(Runtime(), txId); const auto request = Sprintf(R"( OperationParams { @@ -1840,41 +1774,34 @@ partitioning_settings { destination_prefix: "" } } - )", port); + )", S3Port()); // create operation - TestExport(runtime, ++txId, "/MyRoot", request); + TestExport(Runtime(), ++txId, "/MyRoot", request); const ui64 exportId = txId; // create operation again with same uid - TestExport(runtime, ++txId, "/MyRoot", request); + TestExport(Runtime(), ++txId, "/MyRoot", request); // new operation was not created - TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + TestGetExport(Runtime(), txId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); // check previous operation - TestGetExport(runtime, exportId, "/MyRoot"); - env.TestWaitNotification(runtime, exportId); + TestGetExport(Runtime(), exportId, "/MyRoot"); + Env().TestWaitNotification(Runtime(), exportId); } Y_UNIT_TEST(ExportStartTime) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); - runtime.UpdateCurrentTime(TInstant::Now()); + Env(); // Init test env + Runtime().UpdateCurrentTime(TInstant::Now()); ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + Env().TestWaitNotification(Runtime(), txId); - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1883,9 +1810,9 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); - const auto desc = TestGetExport(runtime, txId, "/MyRoot"); + const auto desc = TestGetExport(Runtime(), txId, "/MyRoot"); const auto& entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_PREPARING); UNIT_ASSERT(entry.HasStartTime()); @@ -1893,26 +1820,19 @@ partitioning_settings { } Y_UNIT_TEST(CompletedExportEndTime) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); - runtime.UpdateCurrentTime(TInstant::Now()); + Env(); // Init test env + Runtime().UpdateCurrentTime(TInstant::Now()); ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1921,13 +1841,13 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); - runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing export + Runtime().AdvanceCurrentTime(TDuration::Seconds(30)); // doing export - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); - const auto desc = TestGetExport(runtime, txId, "/MyRoot"); + const auto desc = TestGetExport(Runtime(), txId, "/MyRoot"); const auto& entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_DONE); UNIT_ASSERT(entry.HasStartTime()); @@ -1936,18 +1856,17 @@ partitioning_settings { } Y_UNIT_TEST(CancelledExportEndTime) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); - runtime.UpdateCurrentTime(TInstant::Now()); + Env(); // Init test env + Runtime().UpdateCurrentTime(TInstant::Now()); ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); auto delayFunc = [](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) { @@ -1959,7 +1878,7 @@ partitioning_settings { }; THolder<IEventHandle> delayed; - auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (delayFunc(ev)) { delayed.Reset(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -1967,13 +1886,7 @@ partitioning_settings { return TTestActorRuntime::EEventAction::PROCESS; }); - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -1982,32 +1895,32 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); const ui64 exportId = txId; - runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing export + Runtime().AdvanceCurrentTime(TDuration::Seconds(30)); // doing export if (!delayed) { TDispatchOptions opts; opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { return bool(delayed); }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } - runtime.SetObserverFunc(prevObserver); + Runtime().SetObserverFunc(prevObserver); - TestCancelExport(runtime, ++txId, "/MyRoot", exportId); + TestCancelExport(Runtime(), ++txId, "/MyRoot", exportId); - auto desc = TestGetExport(runtime, exportId, "/MyRoot"); + auto desc = TestGetExport(Runtime(), exportId, "/MyRoot"); auto entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLATION); UNIT_ASSERT(entry.HasStartTime()); UNIT_ASSERT(!entry.HasEndTime()); - runtime.Send(delayed.Release(), 0, true); - env.TestWaitNotification(runtime, exportId); + Runtime().Send(delayed.Release(), 0, true); + Env().TestWaitNotification(Runtime(), exportId); - desc = TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); + desc = TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLED); UNIT_ASSERT(entry.HasStartTime()); @@ -2017,50 +1930,41 @@ partitioning_settings { // Based on CompletedExportEndTime Y_UNIT_TEST(AuditCompletedExport) { - TTestBasicRuntime runtime; std::vector<std::string> auditLines; - runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); - - TTestEnv env(runtime); - - runtime.UpdateCurrentTime(TInstant::Now()); + Runtime().AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + Env(); // Init test env + Runtime().UpdateCurrentTime(TInstant::Now()); ui64 txId = 100; // Prepare table to export // - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); // Start export // - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - const auto request = Sprintf(R"( OperationParams { - labels { - key: "uid" - value: "foo" - } + labels { + key: "uid" + value: "foo" + } } ExportToS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_path: "/MyRoot/Table" - destination_prefix: "" - } + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } } - )", port); - TestExport(runtime, ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876"); + )", S3Port()); + TestExport(Runtime(), ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876"); // Check audit record for export start { @@ -2081,11 +1985,11 @@ partitioning_settings { // Do export // - runtime.AdvanceCurrentTime(TDuration::Seconds(30)); + Runtime().AdvanceCurrentTime(TDuration::Seconds(30)); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); - const auto desc = TestGetExport(runtime, txId, "/MyRoot"); + const auto desc = TestGetExport(Runtime(), txId, "/MyRoot"); const auto& entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_DONE); UNIT_ASSERT(entry.HasStartTime()); @@ -2111,24 +2015,21 @@ partitioning_settings { } Y_UNIT_TEST(AuditCancelledExport) { - TTestBasicRuntime runtime; std::vector<std::string> auditLines; - runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); - - TTestEnv env(runtime); - - runtime.UpdateCurrentTime(TInstant::Now()); + Runtime().AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + Env(); // Init test env + Runtime().UpdateCurrentTime(TInstant::Now()); ui64 txId = 100; // Prepare table to export // - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); auto delayFunc = [](TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) { @@ -2140,7 +2041,7 @@ partitioning_settings { }; THolder<IEventHandle> delayed; - auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { if (delayFunc(ev)) { delayed.Reset(ev.Release()); return TTestActorRuntime::EEventAction::DROP; @@ -2150,18 +2051,12 @@ partitioning_settings { // Start export // - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - const auto request = Sprintf(R"( OperationParams { - labels { - key: "uid" - value: "foo" - } + labels { + key: "uid" + value: "foo" + } } ExportToS3Settings { endpoint: "localhost:%d" @@ -2171,8 +2066,8 @@ partitioning_settings { destination_prefix: "" } } - )", port); - TestExport(runtime, ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876"); + )", S3Port()); + TestExport(Runtime(), ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876"); const ui64 exportId = txId; // Check audit record for export start @@ -2194,31 +2089,31 @@ partitioning_settings { // Do export (unsuccessfully) // - runtime.AdvanceCurrentTime(TDuration::Seconds(30)); + Runtime().AdvanceCurrentTime(TDuration::Seconds(30)); if (!delayed) { TDispatchOptions opts; opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { return bool(delayed); }); - runtime.DispatchEvents(opts); + Runtime().DispatchEvents(opts); } - runtime.SetObserverFunc(prevObserver); + Runtime().SetObserverFunc(prevObserver); // Cancel export mid-air // - TestCancelExport(runtime, ++txId, "/MyRoot", exportId); + TestCancelExport(Runtime(), ++txId, "/MyRoot", exportId); - auto desc = TestGetExport(runtime, exportId, "/MyRoot"); + auto desc = TestGetExport(Runtime(), exportId, "/MyRoot"); auto entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLATION); UNIT_ASSERT(entry.HasStartTime()); UNIT_ASSERT(!entry.HasEndTime()); - runtime.Send(delayed.Release(), 0, true); - env.TestWaitNotification(runtime, exportId); + Runtime().Send(delayed.Release(), 0, true); + Env().TestWaitNotification(Runtime(), exportId); - desc = TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); + desc = TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLED); UNIT_ASSERT(entry.HasStartTime()); @@ -2245,47 +2140,35 @@ partitioning_settings { } Y_UNIT_TEST(ExportPartitioningSettings) { - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TTestBasicRuntime runtime; - TTestEnv env(runtime); - - Run(runtime, env, TVector<TString>{ + Run(Runtime(), Env(), TVector<TString>{ R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] PartitionConfig { - PartitioningPolicy { - MinPartitionsCount: 10 - SplitByLoadSettings: { - Enabled: true - } + PartitioningPolicy { + MinPartitionsCount: 10 + SplitByLoadSettings: { + Enabled: true } + } } )" }, - Sprintf( - R"( - ExportToS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_path: "/MyRoot/Table" - destination_prefix: "" - } - } - )", - port - ) + Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", S3Port()) ); - auto* scheme = s3Mock.GetData().FindPtr("/scheme.pb"); + auto* scheme = S3Mock().GetData().FindPtr("/scheme.pb"); UNIT_ASSERT(scheme); CheckTableScheme(*scheme, GetPartitioningSettings, CreateProtoComparator(R"( partitioning_by_size: DISABLED @@ -2295,57 +2178,46 @@ partitioning_settings { } Y_UNIT_TEST(ExportIndexTablePartitioningSettings) { - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TestCreateIndexedTable(Runtime(), ++txId, "/MyRoot", R"( TableDescription { - Name: "Table" - Columns { Name: "key" Type: "Uint32" } - Columns { Name: "value" Type: "Utf8" } - KeyColumnNames: ["key"] + Name: "Table" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] } IndexDescription { - Name: "ByValue" - KeyColumnNames: ["value"] - IndexImplTableDescriptions: [ { - PartitionConfig { - PartitioningPolicy { - MinPartitionsCount: 10 - SplitByLoadSettings: { - Enabled: true - } - } + Name: "ByValue" + KeyColumnNames: ["value"] + IndexImplTableDescriptions: [ { + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 10 + SplitByLoadSettings: { + Enabled: true } - } ] + } + } + } ] } )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); - TestExport(runtime, ++txId, "/MyRoot", Sprintf( - R"( - ExportToS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_path: "/MyRoot/Table" - destination_prefix: "" - } - } - )", - port - ) - ); - env.TestWaitNotification(runtime, txId); + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", S3Port())); + Env().TestWaitNotification(Runtime(), txId); - auto* scheme = s3Mock.GetData().FindPtr("/scheme.pb"); + auto* scheme = S3Mock().GetData().FindPtr("/scheme.pb"); UNIT_ASSERT(scheme); CheckTableScheme(*scheme, GetIndexTablePartitioningSettings, CreateProtoComparator(R"( partitioning_by_size: DISABLED @@ -2355,23 +2227,16 @@ partitioning_settings { } Y_UNIT_TEST(UserSID) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + Env().TestWaitNotification(Runtime(), txId); const TString request = Sprintf(R"( ExportToS3Settings { @@ -2382,41 +2247,35 @@ partitioning_settings { destination_prefix: "" } } - )", port); + )", S3Port()); const TString userSID = "user@builtin"; - TestExport(runtime, ++txId, "/MyRoot", request, userSID); + TestExport(Runtime(), ++txId, "/MyRoot", request, userSID); - const auto desc = TestGetExport(runtime, txId, "/MyRoot"); + const auto desc = TestGetExport(Runtime(), txId, "/MyRoot"); const auto& entry = desc.GetResponse().GetEntry(); UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_PREPARING); UNIT_ASSERT_VALUES_EQUAL(entry.GetUserSID(), userSID); } Y_UNIT_TEST(TablePermissions) { - TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().EnablePermissionsExport(true)); + EnvOptions().EnablePermissionsExport(true); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); NACLib::TDiffACL diffACL; diffACL.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericUse, "user@builtin", NACLib::InheritNone); - TestModifyACL(runtime, ++txId, "/MyRoot", "Table", diffACL.SerializeAsString(), "user@builtin"); - env.TestWaitNotification(runtime, txId); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); + TestModifyACL(Runtime(), ++txId, "/MyRoot", "Table", diffACL.SerializeAsString(), "user@builtin"); + Env().TestWaitNotification(Runtime(), txId); - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -2425,46 +2284,40 @@ partitioning_settings { destination_prefix: "" } } - )", port)); - env.TestWaitNotification(runtime, txId); + )", S3Port())); + Env().TestWaitNotification(Runtime(), txId); - auto* permissions = s3Mock.GetData().FindPtr("/permissions.pb"); + auto* permissions = S3Mock().GetData().FindPtr("/permissions.pb"); UNIT_ASSERT(permissions); CheckPermissions(*permissions, CreateProtoComparator(R"( actions { - change_owner: "user@builtin" + change_owner: "user@builtin" } actions { - grant { - subject: "user@builtin" - permission_names: "ydb.generic.use" - } + grant { + subject: "user@builtin" + permission_names: "ydb.generic.use" + } } )")); } Y_UNIT_TEST(Checksums) { - TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().EnablePermissionsExport(true).EnableChecksumsExport(true)); + EnvOptions().EnablePermissionsExport(true).EnableChecksumsExport(true); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); + Env().TestWaitNotification(Runtime(), txId); - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + UploadRow(Runtime(), "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -2473,59 +2326,53 @@ partitioning_settings { destination_prefix: "" } } - )", port)); - env.TestWaitNotification(runtime, txId); + )", S3Port())); + Env().TestWaitNotification(Runtime(), txId); - UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8); - const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256"); + UNIT_ASSERT_VALUES_EQUAL(S3Mock().GetData().size(), 8); + const auto* dataChecksum = S3Mock().GetData().FindPtr("/data_00.csv.sha256"); UNIT_ASSERT(dataChecksum); UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv"); - const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256"); + const auto* metadataChecksum = S3Mock().GetData().FindPtr("/metadata.json.sha256"); UNIT_ASSERT(metadataChecksum); UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json"); - const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256"); + const auto* schemeChecksum = S3Mock().GetData().FindPtr("/scheme.pb.sha256"); UNIT_ASSERT(schemeChecksum); UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb"); - const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256"); + const auto* permissionsChecksum = S3Mock().GetData().FindPtr("/permissions.pb.sha256"); UNIT_ASSERT(permissionsChecksum); UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb"); } Y_UNIT_TEST(EnableChecksumsPersistance) { - TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true)); + EnvOptions().EnableChecksumsExport(true); + Env(); // Init test env ui64 txId = 100; // Create test table - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); // Add some test data - UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + UploadRow(Runtime(), "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); // Block sending backup task to datashards - TBlockEvents<TEvDataShard::TEvProposeTransaction> block(runtime, [](auto& ev) { + TBlockEvents<TEvDataShard::TEvProposeTransaction> block(Runtime(), [](auto& ev) { NKikimrTxDataShard::TFlatSchemeTransaction schemeTx; UNIT_ASSERT(schemeTx.ParseFromString(ev.Get()->Get()->GetTxBody())); return schemeTx.HasBackup(); }); // Start export and expect it to be blocked - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -2534,57 +2381,51 @@ partitioning_settings { destination_prefix: "" } } - )", port)); + )", S3Port())); - runtime.WaitFor("backup task is sent to datashards", [&]{ return block.size() >= 1; }); + Runtime().WaitFor("backup task is sent to datashards", [&]{ return block.size() >= 1; }); // Stop blocking new events block.Stop(); // Reboot SchemeShard to resend backup task - RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + RebootTablet(Runtime(), TTestTxConfig::SchemeShard, Runtime().AllocateEdgeActor()); // Wait for export to complete - env.TestWaitNotification(runtime, txId); + Env().TestWaitNotification(Runtime(), txId); // Verify checksums are created - UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 6); + UNIT_ASSERT_VALUES_EQUAL(S3Mock().GetData().size(), 6); - const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256"); + const auto* dataChecksum = S3Mock().GetData().FindPtr("/data_00.csv.sha256"); UNIT_ASSERT(dataChecksum); UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv"); - const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256"); + const auto* metadataChecksum = S3Mock().GetData().FindPtr("/metadata.json.sha256"); UNIT_ASSERT(metadataChecksum); UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json"); - const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256"); + const auto* schemeChecksum = S3Mock().GetData().FindPtr("/scheme.pb.sha256"); UNIT_ASSERT(schemeChecksum); UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb"); } Y_UNIT_TEST(ChecksumsWithCompression) { - TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true)); + EnvOptions().EnableChecksumsExport(true); + Env(); // Init test env ui64 txId = 100; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); - env.TestWaitNotification(runtime, txId); - - UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); + Env().TestWaitNotification(Runtime(), txId); - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); + UploadRow(Runtime(), "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); - TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -2594,10 +2435,10 @@ partitioning_settings { } compression: "zstd" } - )", port)); - env.TestWaitNotification(runtime, txId); + )", S3Port())); + Env().TestWaitNotification(Runtime(), txId); - const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256"); + const auto* dataChecksum = S3Mock().GetData().FindPtr("/data_00.csv.sha256"); UNIT_ASSERT(dataChecksum); UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv"); } @@ -2704,15 +2545,7 @@ attributes { }; Y_UNIT_TEST(Changefeeds) { - TTestBasicRuntime runtime; - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - ChangefeedGenerator gen(3, s3Mock); + ChangefeedGenerator gen(3, S3Mock()); auto request = Sprintf(R"( ExportToS3Settings { @@ -2723,12 +2556,13 @@ attributes { destination_prefix: "" } } - )", port); + )", S3Port()); - TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true)); - runtime.GetAppData().FeatureFlags.SetEnableChangefeedsExport(true); + EnvOptions().EnableChecksumsExport(true); + Env(); // Init test env + Runtime().GetAppData().FeatureFlags.SetEnableChangefeedsExport(true); - Run(runtime, env, TVector<TString>{ + Run(Runtime(), Env(), TVector<TString>{ R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } @@ -2741,9 +2575,7 @@ attributes { } Y_UNIT_TEST(SchemaMapping) { - TTestBasicRuntime runtime; - - RunS3(runtime, { + RunS3({ R"( Name: "Table1" Columns { Name: "key" Type: "Utf8" } @@ -2780,9 +2612,7 @@ attributes { } Y_UNIT_TEST(SchemaMappingEncryption) { - TTestBasicRuntime runtime; - - RunS3(runtime, { + RunS3({ R"( Name: "Table1" Columns { Name: "key" Type: "Utf8" } @@ -2824,9 +2654,7 @@ attributes { } Y_UNIT_TEST(SchemaMappingEncryptionIncorrectKey) { - TTestBasicRuntime runtime; - - RunS3(runtime, { + RunS3({ R"( Name: "Table1" Columns { Name: "key" Type: "Utf8" } @@ -2862,9 +2690,7 @@ attributes { } Y_UNIT_TEST(EncryptedExport) { - TTestBasicRuntime runtime; - - RunS3(runtime, { + RunS3({ R"( Name: "Table1" Columns { Name: "key" Type: "Uint32" } @@ -2899,15 +2725,17 @@ attributes { } )"); - UNIT_ASSERT(HasS3File("/my_export/metadata.json")); - UNIT_ASSERT(HasS3File("/my_export/SchemaMapping/metadata.json.enc")); - UNIT_ASSERT(HasS3File("/my_export/SchemaMapping/mapping.json.enc")); - UNIT_ASSERT(HasS3File("/my_export/001/scheme.pb.enc")); - UNIT_ASSERT(HasS3File("/my_export/001/data_00.csv.enc")); - UNIT_ASSERT(HasS3File("/my_export/001/data_01.csv.enc")); - UNIT_ASSERT(HasS3File("/my_export/002/scheme.pb.enc")); - UNIT_ASSERT(HasS3File("/my_export/002/data_00.csv.enc")); - UNIT_ASSERT(HasS3File("/my_export/002/data_01.csv.enc")); + CheckHasAllS3Files({ + "/my_export/metadata.json", + "/my_export/SchemaMapping/metadata.json.enc", + "/my_export/SchemaMapping/mapping.json.enc", + "/my_export/001/scheme.pb.enc", + "/my_export/001/data_00.csv.enc", + "/my_export/001/data_01.csv.enc", + "/my_export/002/scheme.pb.enc", + "/my_export/002/data_00.csv.enc", + "/my_export/002/data_01.csv.enc", + }); THashSet<TString> ivs; for (auto [key, content] : S3Mock().GetData()) { @@ -2932,15 +2760,6 @@ attributes { } Y_UNIT_TEST(AutoDropping) { - TTestBasicRuntime runtime; - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock({}, TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - auto request = Sprintf(R"( ExportToS3Settings { endpoint: "localhost:%d" @@ -2950,20 +2769,18 @@ attributes { destination_prefix: "" } } - )", port); + )", S3Port()); - TTestEnv env(runtime); - - Run(runtime, env, TVector<TString>{ + Run(Runtime(), Env(), TVector<TString>{ R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } - Columns { Name: "value" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )", }, request, Ydb::StatusIds::SUCCESS, "/MyRoot"); - auto desc = DescribePath(runtime, "/MyRoot"); + auto desc = DescribePath(Runtime(), "/MyRoot"); UNIT_ASSERT_EQUAL(desc.GetPathDescription().ChildrenSize(), 1); UNIT_ASSERT_EQUAL(desc.GetPathDescription().GetChildren(0).GetName(), "Table"); } |