aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@ydb.tech>2025-04-22 13:20:37 +0300
committerGitHub <noreply@github.com>2025-04-22 13:20:37 +0300
commit63874fb7cb65c52166d07b61c93a3dba2cc80274 (patch)
tree94deaeb31d1d91e40880a69aa8e8c3aa3fb21181
parent73bc54207426e222268e0589263281497de02187 (diff)
downloadydb-63874fb7cb65c52166d07b61c93a3dba2cc80274.tar.gz
Refactor ut_export: move local scope test variables to fixture (#17461)
-rw-r--r--ydb/core/tx/schemeshard/ut_export/ut_export.cpp1823
1 files changed, 820 insertions, 1003 deletions
diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
index 3974307eda4..fa38ffaba4d 100644
--- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
@@ -156,91 +156,6 @@ namespace {
TestGetExport(runtime, schemeshardId, exportId, dbName, Ydb::StatusIds::NOT_FOUND);
}
- using TDelayFunc = std::function<bool(TAutoPtr<IEventHandle>&)>;
-
- void Cancel(const TVector<TString>& tables, const TString& request, TDelayFunc delayFunc) {
- TTestBasicRuntime runtime;
- std::vector<std::string> auditLines;
- runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines));
-
- TTestEnv env(runtime);
- ui64 txId = 100;
-
- for (const auto& table : tables) {
- TestCreateTable(runtime, ++txId, "/MyRoot", table);
- env.TestWaitNotification(runtime, txId);
- }
-
- runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
-
- THolder<IEventHandle> delayed;
- auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- if (delayFunc(ev)) {
- delayed.Reset(ev.Release());
- return TTestActorRuntime::EEventAction::DROP;
- }
- return TTestActorRuntime::EEventAction::PROCESS;
- });
-
- TestExport(runtime, ++txId, "/MyRoot", request);
- const ui64 exportId = txId;
-
- // Check audit record for export start
- {
- auto line = FindAuditLine(auditLines, "operation=EXPORT START");
- UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard");
- UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT START");
- UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId));
- UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value
- UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}");
- UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot");
- UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS");
- UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS");
- UNIT_ASSERT(!line.contains("reason"));
- UNIT_ASSERT(!line.contains("start_time"));
- UNIT_ASSERT(!line.contains("end_time"));
- }
-
- if (!delayed) {
- TDispatchOptions opts;
- opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
- return bool(delayed);
- });
- runtime.DispatchEvents(opts);
- }
-
- runtime.SetObserverFunc(prevObserver);
-
- TestCancelExport(runtime, ++txId, "/MyRoot", exportId);
- runtime.Send(delayed.Release(), 0, true);
- env.TestWaitNotification(runtime, exportId);
-
- // Check audit record for export end
- //
- {
- auto line = FindAuditLine(auditLines, "operation=EXPORT END");
- UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard");
- UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT END");
- UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId));
- UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value
- UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}");
- UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot");
- UNIT_ASSERT_STRING_CONTAINS(line, "status=ERROR");
- UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=CANCELLED");
- UNIT_ASSERT_STRING_CONTAINS(line, "reason=Cancelled");
- UNIT_ASSERT_STRING_CONTAINS(line, "start_time=");
- UNIT_ASSERT_STRING_CONTAINS(line, "end_time=");
- }
-
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
-
- TestForgetExport(runtime, ++txId, "/MyRoot", exportId);
- env.TestWaitNotification(runtime, exportId);
-
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
- }
-
const Ydb::Table::PartitioningSettings& GetPartitioningSettings(
const Ydb::Table::CreateTableRequest& tableDescription
) {
@@ -295,15 +210,15 @@ namespace {
class TExportFixture : public NUnitTest::TBaseFixture {
public:
- void RunS3(TTestBasicRuntime& runtime, const TVector<TString>& tables, const TString& requestTpl, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) {
+ void RunS3(const TVector<TString>& tables, const TString& requestTpl, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS, bool checkS3FilesExistence = true) {
auto requestStr = Sprintf(requestTpl.c_str(), S3Port());
NKikimrExport::TCreateExportRequest request;
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(requestStr, &request));
- TTestEnv env(runtime);
- runtime.GetAppData().FeatureFlags.SetEnableEncryptedExport(true);
+ Env(); // Init test env
+ Runtime().GetAppData().FeatureFlags.SetEnableEncryptedExport(true);
- Run(runtime, env, tables, requestStr, expectedStatus, "/MyRoot", false);
+ Run(Runtime(), Env(), tables, requestStr, expectedStatus, "/MyRoot", false);
auto calcPath = [&](const TString& targetPath, const TString& file) {
TString canonPath = (targetPath.StartsWith("/") || targetPath.empty()) ? targetPath : TString("/") + targetPath;
@@ -316,7 +231,7 @@ namespace {
return result;
};
- if (expectedStatus == Ydb::StatusIds::SUCCESS) {
+ if (expectedStatus == Ydb::StatusIds::SUCCESS && checkS3FilesExistence) {
for (auto& path : GetExportTargetPaths(requestStr)) {
UNIT_ASSERT_C(HasS3File(calcPath(path, "metadata.json")), calcPath(path, "metadata.json"));
UNIT_ASSERT_C(HasS3File(calcPath(path, "scheme.pb")), calcPath(path, "scheme.pb"));
@@ -329,6 +244,29 @@ namespace {
return it != S3Mock().GetData().end();
}
+ template <class T>
+ void CheckHasAllS3Files(std::initializer_list<T> paths) {
+ for (const T& path : paths) {
+ UNIT_ASSERT_C(HasS3File(path), "Path \"" << path << "\" is expected to exist in S3");
+ }
+ }
+
+ template <class T>
+ void CheckNoSuchS3Files(std::initializer_list<T> paths) {
+ for (const T& path : paths) {
+ UNIT_ASSERT_C(!HasS3File(path), "Path \"" << path << "\" is expected not to exist in S3");
+ }
+ }
+
+ template <class T>
+ void CheckNoS3Prefix(std::initializer_list<T> prefixes) {
+ for (const T& prefix : prefixes) {
+ for (auto&& [fileName, _] : S3Mock().GetData()) {
+ UNIT_ASSERT_C(!fileName.StartsWith(prefix), "S3 path \"" << fileName << "\" has prefix \"" << prefix << "\", which is not expected prefix");
+ }
+ }
+ }
+
TString GetS3FileContent(const TString& path) {
auto it = S3Mock().GetData().find(path);
if (it != S3Mock().GetData().end()) {
@@ -344,11 +282,350 @@ namespace {
}
}
+ using TDelayFunc = std::function<bool(TAutoPtr<IEventHandle>&)>;
+
+ void Cancel(const TVector<TString>& tables, const TString& request, TDelayFunc delayFunc) {
+ std::vector<std::string> auditLines;
+ Runtime().AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines));
+
+ Env(); // Init test env
+ ui64 txId = 100;
+
+ for (const auto& table : tables) {
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", table);
+ Env().TestWaitNotification(Runtime(), txId);
+ }
+
+ Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
+ Runtime().SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
+
+ THolder<IEventHandle> delayed;
+ auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ if (delayFunc(ev)) {
+ delayed.Reset(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ TestExport(Runtime(), ++txId, "/MyRoot", request);
+ const ui64 exportId = txId;
+
+ // Check audit record for export start
+ {
+ auto line = FindAuditLine(auditLines, "operation=EXPORT START");
+ UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard");
+ UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT START");
+ UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId));
+ UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value
+ UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}");
+ UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot");
+ UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS");
+ UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS");
+ UNIT_ASSERT(!line.contains("reason"));
+ UNIT_ASSERT(!line.contains("start_time"));
+ UNIT_ASSERT(!line.contains("end_time"));
+ }
+
+ if (!delayed) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
+ return bool(delayed);
+ });
+ Runtime().DispatchEvents(opts);
+ }
+
+ Runtime().SetObserverFunc(prevObserver);
+
+ TestCancelExport(Runtime(), ++txId, "/MyRoot", exportId);
+ Runtime().Send(delayed.Release(), 0, true);
+ Env().TestWaitNotification(Runtime(), exportId);
+
+ // Check audit record for export end
+ //
+ {
+ auto line = FindAuditLine(auditLines, "operation=EXPORT END");
+ UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard");
+ UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT END");
+ UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId));
+ UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value
+ UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}");
+ UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot");
+ UNIT_ASSERT_STRING_CONTAINS(line, "status=ERROR");
+ UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=CANCELLED");
+ UNIT_ASSERT_STRING_CONTAINS(line, "reason=Cancelled");
+ UNIT_ASSERT_STRING_CONTAINS(line, "start_time=");
+ UNIT_ASSERT_STRING_CONTAINS(line, "end_time=");
+ }
+
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+
+ TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId);
+ Env().TestWaitNotification(Runtime(), exportId);
+
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
+ }
+
+ void CancelUponTransferringShouldSucceed(const TVector<TString>& tables, const TString& request) {
+ Cancel(tables, Sprintf(request.c_str(), S3Port()), [](TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) {
+ return false;
+ }
+
+ return ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record
+ .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpBackup;
+ });
+ }
+
+ void CancelShouldSucceed(TDelayFunc delayFunc) {
+ Cancel({
+ R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )",
+ }, Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ }
+ )", S3Port()), delayFunc);
+ }
+
+ void DropCopiesBeforeTransferring(ui32 tablesCount) {
+ Env(); // Init test env
+ ui64 txId = 100;
+
+ for (ui32 i = 1; i <= tablesCount; ++i) {
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
+ Name: "Table%d"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )", i));
+ Env().TestWaitNotification(Runtime(), txId);
+ }
+
+ Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
+ Runtime().SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
+
+ bool dropNotification = false;
+ THolder<IEventHandle> delayed;
+ auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvSchemeShard::EvModifySchemeTransaction:
+ break;
+ case TEvSchemeShard::EvNotifyTxCompletionResult:
+ if (dropNotification) {
+ delayed.Reset(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ default:
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+
+ const auto* msg = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>();
+ if (msg->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
+ dropNotification = true;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ TStringBuilder items;
+ for (ui32 i = 1; i <= tablesCount; ++i) {
+ items << "items {"
+ << " source_path: \"/MyRoot/Table" << i << "\""
+ << " destination_prefix: \"\""
+ << " }";
+ }
+
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ %s
+ }
+ )", S3Port(), items.c_str()));
+ const ui64 exportId = txId;
+
+ if (!delayed) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
+ return bool(delayed);
+ });
+ Runtime().DispatchEvents(opts);
+ }
+
+ Runtime().SetObserverFunc(prevObserver);
+
+ for (ui32 i = 0; i < tablesCount; ++i) {
+ TestDropTable(Runtime(), ++txId, Sprintf("/MyRoot/export-%" PRIu64, exportId), ToString(i));
+ Env().TestWaitNotification(Runtime(), txId);
+ }
+
+ Runtime().Send(delayed.Release(), 0, true);
+ Env().TestWaitNotification(Runtime(), exportId);
+
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+
+ TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId);
+ Env().TestWaitNotification(Runtime(), exportId);
+
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
+ }
+
+ void RebootDuringFinish(bool rejectUploadParts, Ydb::StatusIds::StatusCode expectedStatus) {
+ S3Settings().WithRejectUploadParts(rejectUploadParts);
+
+ Env(); // Init test env
+ ui64 txId = 100;
+
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint32" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ Env().TestWaitNotification(Runtime(), txId);
+
+ UpdateRow(Runtime(), "Table", 1, "valueA");
+ UpdateRow(Runtime(), "Table", 2, "valueB");
+
+ Runtime().SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE);
+ Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
+ Runtime().SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
+
+ TMaybe<ui64> backupTxId;
+ TMaybe<ui64> tabletId;
+ bool delayed = false;
+
+ auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvProposeTransaction: {
+ auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record;
+ if (record.GetTxKind() != NKikimrTxDataShard::ETransactionKind::TX_KIND_SCHEME) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+
+ NKikimrTxDataShard::TFlatSchemeTransaction schemeTx;
+ UNIT_ASSERT(schemeTx.ParseFromString(record.GetTxBody()));
+
+ if (schemeTx.HasBackup()) {
+ backupTxId = record.GetTxId();
+ // hijack
+ schemeTx.MutableBackup()->MutableScanSettings()->SetRowsBatchSize(1);
+ record.SetTxBody(schemeTx.SerializeAsString());
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+
+ case TEvDataShard::EvProposeTransactionResult: {
+ if (!backupTxId) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+
+ const auto& record = ev->Get<TEvDataShard::TEvProposeTransactionResult>()->Record;
+ if (record.GetTxId() != *backupTxId) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+
+ tabletId = record.GetOrigin();
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+
+ case NWrappers::NExternalStorage::EvCompleteMultipartUploadRequest:
+ case NWrappers::NExternalStorage::EvAbortMultipartUploadRequest:
+ delayed = true;
+ return TTestActorRuntime::EEventAction::DROP;
+
+ default:
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+ });
+
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ }
+ )", S3Port()));
+ const ui64 exportId = txId;
+
+ if (!delayed || !tabletId) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed, &tabletId](IEventHandle&) -> bool {
+ return delayed && tabletId;
+ });
+ Runtime().DispatchEvents(opts);
+ }
+
+ Runtime().SetObserverFunc(prevObserver);
+
+ RebootTablet(Runtime(), *tabletId, Runtime().AllocateEdgeActor());
+ Env().TestWaitNotification(Runtime(), exportId);
+
+ TestGetExport(Runtime(), exportId, "/MyRoot", expectedStatus);
+
+ TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId);
+ Env().TestWaitNotification(Runtime(), exportId);
+
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
+ }
+
+ void ShouldCheckQuotas(const TSchemeLimits& limits, Ydb::StatusIds::StatusCode expectedFailStatus) {
+ const TString userSID = "user@builtin";
+ EnvOptions().SystemBackupSIDs({userSID});
+ Env(); // Init test env
+
+ SetSchemeshardSchemaLimits(Runtime(), limits);
+
+ const TVector<TString> tables = {
+ R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )",
+ };
+ const TString request = Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ }
+ )", S3Port());
+
+ Run(Runtime(), Env(), tables, request, expectedFailStatus);
+ Run(Runtime(), Env(), tables, request, Ydb::StatusIds::SUCCESS, "/MyRoot", false, userSID);
+ }
+
protected:
+ TS3Mock::TSettings& S3Settings() {
+ if (!S3ServerSettings) {
+ S3ServerPort = PortManager.GetPort();
+ S3ServerSettings.ConstructInPlace(S3ServerPort);
+ }
+ return *S3ServerSettings;
+ }
+
TS3Mock& S3Mock() {
if (!S3ServerMock) {
- S3ServerPort = PortManager.GetPort();
- S3ServerMock.ConstructInPlace(TS3Mock::TSettings(S3ServerPort));
+ S3ServerMock.ConstructInPlace(S3Settings());
UNIT_ASSERT(S3ServerMock->Start());
}
return *S3ServerMock;
@@ -359,19 +636,42 @@ namespace {
return S3ServerPort;
}
+ TTestBasicRuntime& Runtime() {
+ if (!TestRuntime) {
+ TestRuntime.ConstructInPlace();
+ }
+ return *TestRuntime;
+ }
+
+ TTestEnvOptions& EnvOptions() {
+ if (!TestEnvOptions) {
+ TestEnvOptions.ConstructInPlace();
+ }
+ return *TestEnvOptions;
+ }
+
+ TTestEnv& Env() {
+ if (!TestEnv) {
+ TestEnv.ConstructInPlace(Runtime(), EnvOptions());
+ }
+ return *TestEnv;
+ }
+
private:
TPortManager PortManager;
ui16 S3ServerPort = 0;
+ TMaybe<TTestBasicRuntime> TestRuntime;
+ TMaybe<TS3Mock::TSettings> S3ServerSettings;
TMaybe<TS3Mock> S3ServerMock;
+ TMaybe<TTestEnvOptions> TestEnvOptions;
+ TMaybe<TTestEnv> TestEnv;
};
} // anonymous
Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) {
Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) {
- TTestBasicRuntime runtime;
-
- RunS3(runtime, {
+ RunS3({
R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
@@ -391,9 +691,7 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) {
}
Y_UNIT_TEST(ShouldSucceedOnMultiShardTable) {
- TTestBasicRuntime runtime;
-
- RunS3(runtime, {
+ RunS3({
R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
@@ -414,9 +712,7 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) {
}
Y_UNIT_TEST(ShouldSucceedOnManyTables) {
- TTestBasicRuntime runtime;
-
- RunS3(runtime, {
+ RunS3({
R"(
Name: "Table1"
Columns { Name: "key" Type: "Utf8" }
@@ -446,52 +742,43 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) {
}
Y_UNIT_TEST(ShouldOmitNonStrictStorageSettings) {
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
-
const TVector<TString> tables = {R"(
Name: "Table"
Columns {
- Name: "key"
- Type: "Utf8"
- DefaultFromLiteral {
- type {
- optional_type {
- item {
- type_id: UTF8
- }
- }
- }
- value {
- items {
- text_value: "b"
- }
+ Name: "key"
+ Type: "Utf8"
+ DefaultFromLiteral {
+ type {
+ optional_type {
+ item {
+ type_id: UTF8
}
+ }
+ }
+ value {
+ items {
+ text_value: "b"
+ }
}
+ }
}
Columns {
- Name: "value"
- Type: "Utf8"
- DefaultFromLiteral {
- type {
- optional_type {
- item {
- type_id: UTF8
- }
- }
- }
- value {
- items {
- text_value: "a"
- }
+ Name: "value"
+ Type: "Utf8"
+ DefaultFromLiteral {
+ type {
+ optional_type {
+ item {
+ type_id: UTF8
}
+ }
}
+ value {
+ items {
+ text_value: "a"
+ }
+ }
+ }
}
KeyColumnNames: ["key"]
PartitionConfig {
@@ -515,7 +802,7 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) {
}
)"};
- Run(runtime, env, tables, Sprintf(R"(
+ Run(Runtime(), Env(), tables, Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -524,10 +811,10 @@ Y_UNIT_TEST_SUITE_F(TExportToS3Tests, TExportFixture) {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
- auto schemeIt = s3Mock.GetData().find("/scheme.pb");
- UNIT_ASSERT(schemeIt != s3Mock.GetData().end());
+ auto schemeIt = S3Mock().GetData().find("/scheme.pb");
+ UNIT_ASSERT(schemeIt != S3Mock().GetData().end());
TString scheme = schemeIt->second;
@@ -596,54 +883,45 @@ partitioning_settings {
}
Y_UNIT_TEST(ShouldPreserveIncrBackupFlag) {
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
-
const TTablesWithAttrs tables{
{
R"(
Name: "Table"
Columns {
- Name: "key"
- Type: "Utf8"
- DefaultFromLiteral {
- type {
- optional_type {
- item {
- type_id: UTF8
- }
- }
- }
- value {
- items {
- text_value: "b"
- }
+ Name: "key"
+ Type: "Utf8"
+ DefaultFromLiteral {
+ type {
+ optional_type {
+ item {
+ type_id: UTF8
}
+ }
}
+ value {
+ items {
+ text_value: "b"
+ }
+ }
+ }
}
Columns {
- Name: "value"
- Type: "Utf8"
- DefaultFromLiteral {
- type {
- optional_type {
- item {
- type_id: UTF8
- }
- }
- }
- value {
- items {
- text_value: "a"
- }
+ Name: "value"
+ Type: "Utf8"
+ DefaultFromLiteral {
+ type {
+ optional_type {
+ item {
+ type_id: UTF8
}
+ }
}
+ value {
+ items {
+ text_value: "a"
+ }
+ }
+ }
}
KeyColumnNames: ["key"]
)",
@@ -651,7 +929,7 @@ partitioning_settings {
},
};
- Run(runtime, env, tables, Sprintf(R"(
+ Run(Runtime(), Env(), tables, Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -660,10 +938,10 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
- auto schemeIt = s3Mock.GetData().find("/scheme.pb");
- UNIT_ASSERT(schemeIt != s3Mock.GetData().end());
+ auto schemeIt = S3Mock().GetData().find("/scheme.pb");
+ UNIT_ASSERT(schemeIt != S3Mock().GetData().end());
TString scheme = schemeIt->second;
@@ -728,32 +1006,6 @@ partitioning_settings {
)");
}
- void CancelShouldSucceed(TDelayFunc delayFunc) {
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- Cancel({
- R"(
- Name: "Table"
- Columns { Name: "key" Type: "Utf8" }
- Columns { Name: "value" Type: "Utf8" }
- KeyColumnNames: ["key"]
- )",
- }, Sprintf(R"(
- ExportToS3Settings {
- endpoint: "localhost:%d"
- scheme: HTTP
- items {
- source_path: "/MyRoot/Table"
- destination_prefix: ""
- }
- }
- )", port), delayFunc);
- }
-
Y_UNIT_TEST(CancelUponCreatingExportDirShouldSucceed) {
CancelShouldSucceed([](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) {
@@ -776,23 +1028,6 @@ partitioning_settings {
});
}
- void CancelUponTransferringShouldSucceed(const TVector<TString>& tables, const TString& request) {
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- Cancel(tables, Sprintf(request.c_str(), port), [](TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) {
- return false;
- }
-
- return ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record
- .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpBackup;
- });
- }
-
Y_UNIT_TEST(CancelUponTransferringSingleShardTableShouldSucceed) {
CancelUponTransferringShouldSucceed({
R"(
@@ -869,24 +1104,23 @@ partitioning_settings {
}
Y_UNIT_TEST(DropSourceTableBeforeTransferring) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
- runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
+ Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
+ Runtime().SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
bool dropNotification = false;
THolder<IEventHandle> delayed;
- auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvSchemeShard::EvModifySchemeTransaction:
break;
@@ -908,13 +1142,7 @@ partitioning_settings {
return TTestActorRuntime::EEventAction::PROCESS;
});
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -923,96 +1151,7 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
- const ui64 exportId = txId;
-
- if (!delayed) {
- TDispatchOptions opts;
- opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
- return bool(delayed);
- });
- runtime.DispatchEvents(opts);
- }
-
- runtime.SetObserverFunc(prevObserver);
-
- TestDropTable(runtime, ++txId, "/MyRoot", "Table");
- env.TestWaitNotification(runtime, txId);
-
- runtime.Send(delayed.Release(), 0, true);
- env.TestWaitNotification(runtime, exportId);
-
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
-
- TestForgetExport(runtime, ++txId, "/MyRoot", exportId);
- env.TestWaitNotification(runtime, exportId);
-
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
- }
-
- void DropCopiesBeforeTransferring(ui32 tablesCount) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
- ui64 txId = 100;
-
- for (ui32 i = 1; i <= tablesCount; ++i) {
- TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(R"(
- Name: "Table%d"
- Columns { Name: "key" Type: "Utf8" }
- Columns { Name: "value" Type: "Utf8" }
- KeyColumnNames: ["key"]
- )", i));
- env.TestWaitNotification(runtime, txId);
- }
-
- runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
-
- bool dropNotification = false;
- THolder<IEventHandle> delayed;
- auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- switch (ev->GetTypeRewrite()) {
- case TEvSchemeShard::EvModifySchemeTransaction:
- break;
- case TEvSchemeShard::EvNotifyTxCompletionResult:
- if (dropNotification) {
- delayed.Reset(ev.Release());
- return TTestActorRuntime::EEventAction::DROP;
- }
- return TTestActorRuntime::EEventAction::PROCESS;
- default:
- return TTestActorRuntime::EEventAction::PROCESS;
- }
-
- const auto* msg = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>();
- if (msg->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
- dropNotification = true;
- }
-
- return TTestActorRuntime::EEventAction::PROCESS;
- });
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TStringBuilder items;
- for (ui32 i = 1; i <= tablesCount; ++i) {
- items << "items {"
- << " source_path: \"/MyRoot/Table" << i << "\""
- << " destination_prefix: \"\""
- << " }";
- }
-
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
- ExportToS3Settings {
- endpoint: "localhost:%d"
- scheme: HTTP
- %s
- }
- )", port, items.c_str()));
+ )", S3Port()));
const ui64 exportId = txId;
if (!delayed) {
@@ -1020,25 +1159,23 @@ partitioning_settings {
opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
return bool(delayed);
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
- runtime.SetObserverFunc(prevObserver);
+ Runtime().SetObserverFunc(prevObserver);
- for (ui32 i = 0; i < tablesCount; ++i) {
- TestDropTable(runtime, ++txId, Sprintf("/MyRoot/export-%" PRIu64, exportId), ToString(i));
- env.TestWaitNotification(runtime, txId);
- }
+ TestDropTable(Runtime(), ++txId, "/MyRoot", "Table");
+ Env().TestWaitNotification(Runtime(), txId);
- runtime.Send(delayed.Release(), 0, true);
- env.TestWaitNotification(runtime, exportId);
+ Runtime().Send(delayed.Release(), 0, true);
+ Env().TestWaitNotification(Runtime(), exportId);
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
- TestForgetExport(runtime, ++txId, "/MyRoot", exportId);
- env.TestWaitNotification(runtime, exportId);
+ TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId);
+ Env().TestWaitNotification(Runtime(), exportId);
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
}
Y_UNIT_TEST(DropCopiesBeforeTransferring1) {
@@ -1049,114 +1186,6 @@ partitioning_settings {
DropCopiesBeforeTransferring(2);
}
- void RebootDuringFinish(bool rejectUploadParts, Ydb::StatusIds::StatusCode expectedStatus) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
- ui64 txId = 100;
-
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
- Name: "Table"
- Columns { Name: "key" Type: "Uint32" }
- Columns { Name: "value" Type: "Utf8" }
- KeyColumnNames: ["key"]
- )");
- env.TestWaitNotification(runtime, txId);
-
- UpdateRow(runtime, "Table", 1, "valueA");
- UpdateRow(runtime, "Table", 2, "valueB");
-
- runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
-
- TMaybe<ui64> backupTxId;
- TMaybe<ui64> tabletId;
- bool delayed = false;
-
- auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- switch (ev->GetTypeRewrite()) {
- case TEvDataShard::EvProposeTransaction: {
- auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record;
- if (record.GetTxKind() != NKikimrTxDataShard::ETransactionKind::TX_KIND_SCHEME) {
- return TTestActorRuntime::EEventAction::PROCESS;
- }
-
- NKikimrTxDataShard::TFlatSchemeTransaction schemeTx;
- UNIT_ASSERT(schemeTx.ParseFromString(record.GetTxBody()));
-
- if (schemeTx.HasBackup()) {
- backupTxId = record.GetTxId();
- // hijack
- schemeTx.MutableBackup()->MutableScanSettings()->SetRowsBatchSize(1);
- record.SetTxBody(schemeTx.SerializeAsString());
- }
-
- return TTestActorRuntime::EEventAction::PROCESS;
- }
-
- case TEvDataShard::EvProposeTransactionResult: {
- if (!backupTxId) {
- return TTestActorRuntime::EEventAction::PROCESS;
- }
-
- const auto& record = ev->Get<TEvDataShard::TEvProposeTransactionResult>()->Record;
- if (record.GetTxId() != *backupTxId) {
- return TTestActorRuntime::EEventAction::PROCESS;
- }
-
- tabletId = record.GetOrigin();
- return TTestActorRuntime::EEventAction::PROCESS;
- }
-
- case NWrappers::NExternalStorage::EvCompleteMultipartUploadRequest:
- case NWrappers::NExternalStorage::EvAbortMultipartUploadRequest:
- delayed = true;
- return TTestActorRuntime::EEventAction::DROP;
-
- default:
- return TTestActorRuntime::EEventAction::PROCESS;
- }
- });
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port).WithRejectUploadParts(rejectUploadParts));
- UNIT_ASSERT(s3Mock.Start());
-
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
- ExportToS3Settings {
- endpoint: "localhost:%d"
- scheme: HTTP
- items {
- source_path: "/MyRoot/Table"
- destination_prefix: ""
- }
- }
- )", port));
- const ui64 exportId = txId;
-
- if (!delayed || !tabletId) {
- TDispatchOptions opts;
- opts.FinalEvents.emplace_back([&delayed, &tabletId](IEventHandle&) -> bool {
- return delayed && tabletId;
- });
- runtime.DispatchEvents(opts);
- }
-
- runtime.SetObserverFunc(prevObserver);
-
- RebootTablet(runtime, *tabletId, runtime.AllocateEdgeActor());
- env.TestWaitNotification(runtime, exportId);
-
- TestGetExport(runtime, exportId, "/MyRoot", expectedStatus);
-
- TestForgetExport(runtime, ++txId, "/MyRoot", exportId);
- env.TestWaitNotification(runtime, exportId);
-
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
- }
-
Y_UNIT_TEST(RebootDuringCompletion) {
RebootDuringFinish(false, Ydb::StatusIds::SUCCESS);
}
@@ -1166,12 +1195,12 @@ partitioning_settings {
}
Y_UNIT_TEST(ShouldExcludeBackupTableFromStats) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true));
+ EnvOptions().DisableStatsBatching(true);
+ Env(); // Init test env
ui64 txId = 100;
THashSet<ui64> statsCollected;
- runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvDataShard::EvPeriodicTableStats) {
statsCollected.insert(ev->Get<TEvDataShard::TEvPeriodicTableStats>()->Record.GetDatashardId());
}
@@ -1186,41 +1215,35 @@ partitioning_settings {
opts.FinalEvents.emplace_back([&](IEventHandle&) -> bool {
return statsCollected.size() == count;
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
- return DescribePath(runtime, "/MyRoot")
+ return DescribePath(Runtime(), "/MyRoot")
.GetPathDescription()
.GetDomainDescription()
.GetDiskSpaceUsage();
};
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
for (int i = 1; i < 500; ++i) {
- UpdateRow(runtime, "Table", i, "value");
+ UpdateRow(Runtime(), "Table", i, "value");
}
// trigger memtable's compaction
- TestCopyTable(runtime, ++txId, "/MyRoot", "CopyTable", "/MyRoot/Table");
- env.TestWaitNotification(runtime, txId);
- TestDropTable(runtime, ++txId, "/MyRoot", "Table");
- env.TestWaitNotification(runtime, txId);
+ TestCopyTable(Runtime(), ++txId, "/MyRoot", "CopyTable", "/MyRoot/Table");
+ Env().TestWaitNotification(Runtime(), txId);
+ TestDropTable(Runtime(), ++txId, "/MyRoot", "Table");
+ Env().TestWaitNotification(Runtime(), txId);
const auto expected = waitForStats(1);
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1229,15 +1252,15 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
const ui64 exportId = txId;
::NKikimrSubDomains::TDiskSpaceUsage afterExport;
TTestActorRuntime::TEventObserver prevObserverFunc;
- prevObserverFunc = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
+ prevObserverFunc = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
if (auto* p = event->CastAsLocal<TEvSchemeShard::TEvModifySchemeTransaction>()) {
auto& record = p->Record;
- if (record.TransactionSize() >= 1 &&
+ if (record.TransactionSize() >= 1 &&
record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpDropTable) {
afterExport = waitForStats(2);
}
@@ -1245,50 +1268,43 @@ partitioning_settings {
return prevObserverFunc(event);
});
- env.TestWaitNotification(runtime, exportId);
+ Env().TestWaitNotification(Runtime(), exportId);
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_STRINGS_EQUAL(expected.DebugString(), afterExport.DebugString());
- TestForgetExport(runtime, ++txId, "/MyRoot", exportId);
- env.TestWaitNotification(runtime, exportId);
+ TestForgetExport(Runtime(), ++txId, "/MyRoot", exportId);
+ Env().TestWaitNotification(Runtime(), exportId);
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
const auto afterForget = waitForStats(1);
UNIT_ASSERT_STRINGS_EQUAL(expected.DebugString(), afterForget.DebugString());
}
Y_UNIT_TEST(CheckItemProgress) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TBlockEvents<NKikimr::NWrappers::NExternalStorage::TEvPutObjectRequest> blockPartition01(runtime, [](auto&& ev) {
+ TBlockEvents<NKikimr::NWrappers::NExternalStorage::TEvPutObjectRequest> blockPartition01(Runtime(), [](auto&& ev) {
return ev->Get()->Request.GetKey() == "/data_01.csv";
});
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 10 } }}}
)");
- env.TestWaitNotification(runtime, txId);
-
- WriteRow(runtime, ++txId, "/MyRoot/Table", 0, 1, "v1");
- env.TestWaitNotification(runtime, txId);
- WriteRow(runtime, ++txId, "/MyRoot/Table", 1, 100, "v100");
- env.TestWaitNotification(runtime, txId);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
+ Env().TestWaitNotification(Runtime(), txId);
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ WriteRow(Runtime(), ++txId, "/MyRoot/Table", 0, 1, "v1");
+ Env().TestWaitNotification(Runtime(), txId);
+ WriteRow(Runtime(), ++txId, "/MyRoot/Table", 1, 100, "v100");
+ Env().TestWaitNotification(Runtime(), txId);
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1297,14 +1313,13 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
-
+ )", S3Port()));
- runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition01.size() >= 1; });
+ Runtime().WaitFor("put object request from 01 partition", [&]{ return blockPartition01.size() >= 1; });
bool isCompleted = false;
while (!isCompleted) {
- const auto desc = TestGetExport(runtime, txId, "/MyRoot");
+ const auto desc = TestGetExport(Runtime(), txId, "/MyRoot");
const auto entry = desc.GetResponse().GetEntry();
const auto& item = entry.GetItemsProgress(0);
@@ -1314,44 +1329,37 @@ partitioning_settings {
UNIT_ASSERT_VALUES_EQUAL(item.parts_completed(), 1);
UNIT_ASSERT(item.has_start_time());
} else {
- runtime.SimulateSleep(TDuration::Seconds(1));
+ Runtime().SimulateSleep(TDuration::Seconds(1));
}
}
blockPartition01.Stop();
blockPartition01.Unblock();
-
- env.TestWaitNotification(runtime, txId);
- const auto desc = TestGetExport(runtime, txId, "/MyRoot");
+ Env().TestWaitNotification(Runtime(), txId);
+
+ const auto desc = TestGetExport(Runtime(), txId, "/MyRoot");
const auto entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.ItemsProgressSize(), 1);
}
Y_UNIT_TEST(ShouldRestartOnScanErrors) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- UpdateRow(runtime, "Table", 1, "valueA");
+ Env().TestWaitNotification(Runtime(), txId);
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ UpdateRow(Runtime(), "Table", 1, "valueA");
THolder<IEventHandle> injectResult;
- auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == NSharedCache::EvResult) {
const auto* msg = ev->Get<NSharedCache::TEvResult>();
UNIT_ASSERT_VALUES_EQUAL(msg->Status, NKikimrProto::OK);
@@ -1366,7 +1374,7 @@ partitioning_settings {
return TTestActorRuntime::EEventAction::PROCESS;
});
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1375,44 +1383,37 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
if (!injectResult) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&injectResult](IEventHandle&) -> bool {
return bool(injectResult);
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
- runtime.SetObserverFunc(prevObserver);
- runtime.Send(injectResult.Release(), 0, true);
+ Runtime().SetObserverFunc(prevObserver);
+ Runtime().Send(injectResult.Release(), 0, true);
- env.TestWaitNotification(runtime, txId);
- TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS);
+ Env().TestWaitNotification(Runtime(), txId);
+ TestGetExport(Runtime(), txId, "/MyRoot", Ydb::StatusIds::SUCCESS);
}
Y_UNIT_TEST(ShouldSucceedOnConcurrentTxs) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ Env().TestWaitNotification(Runtime(), txId);
THolder<IEventHandle> copyTables;
- auto origObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ auto origObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) {
const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record;
if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
@@ -1424,7 +1425,7 @@ partitioning_settings {
});
const auto exportId = ++txId;
- TestExport(runtime, exportId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), exportId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1433,18 +1434,18 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
if (!copyTables) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&copyTables](IEventHandle&) -> bool {
return bool(copyTables);
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
THolder<IEventHandle> proposeTxResult;
- runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvDataShard::EvProposeTransactionResult) {
proposeTxResult.Reset(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -1452,7 +1453,7 @@ partitioning_settings {
return TTestActorRuntime::EEventAction::PROCESS;
});
- TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ TestAlterTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "extra" Type: "Utf8"}
)");
@@ -1462,39 +1463,32 @@ partitioning_settings {
opts.FinalEvents.emplace_back([&proposeTxResult](IEventHandle&) -> bool {
return bool(proposeTxResult);
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
- runtime.SetObserverFunc(origObserver);
- runtime.Send(copyTables.Release(), 0, true);
- runtime.Send(proposeTxResult.Release(), 0, true);
- env.TestWaitNotification(runtime, txId);
+ Runtime().SetObserverFunc(origObserver);
+ Runtime().Send(copyTables.Release(), 0, true);
+ Runtime().Send(proposeTxResult.Release(), 0, true);
+ Env().TestWaitNotification(Runtime(), txId);
- env.TestWaitNotification(runtime, exportId);
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
+ Env().TestWaitNotification(Runtime(), exportId);
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
}
Y_UNIT_TEST(ShouldSucceedOnConcurrentExport) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ Env().TestWaitNotification(Runtime(), txId);
TVector<THolder<IEventHandle>> copyTables;
- auto origObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ auto origObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) {
const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record;
if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
@@ -1504,20 +1498,20 @@ partitioning_settings {
}
return TTestActorRuntime::EEventAction::PROCESS;
});
- auto waitCopyTables = [&runtime, &copyTables](ui32 size) {
+ auto waitCopyTables = [this, &copyTables](ui32 size) {
if (copyTables.size() != size) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&copyTables, size](IEventHandle&) -> bool {
return copyTables.size() == size;
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
};
TVector<ui64> exportIds;
for (ui32 i = 1; i <= 3; ++i) {
exportIds.push_back(++txId);
- TestExport(runtime, exportIds[i - 1], "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), exportIds[i - 1], "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1526,42 +1520,35 @@ partitioning_settings {
destination_prefix: "Table%u"
}
}
- )", port, i));
+ )", S3Port(), i));
waitCopyTables(i);
}
- runtime.SetObserverFunc(origObserver);
+ Runtime().SetObserverFunc(origObserver);
for (auto& ev : copyTables) {
- runtime.Send(ev.Release(), 0, true);
+ Runtime().Send(ev.Release(), 0, true);
}
for (ui64 exportId : exportIds) {
- env.TestWaitNotification(runtime, exportId);
- TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
+ Env().TestWaitNotification(Runtime(), exportId);
+ TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
}
}
Y_UNIT_TEST(ShouldSucceedOnConcurrentImport) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ Env().TestWaitNotification(Runtime(), txId);
// prepare backup data
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1570,12 +1557,12 @@ partitioning_settings {
destination_prefix: "Backup1"
}
}
- )", port));
- env.TestWaitNotification(runtime, txId);
- TestGetExport(runtime, txId, "/MyRoot");
+ )", S3Port()));
+ Env().TestWaitNotification(Runtime(), txId);
+ TestGetExport(Runtime(), txId, "/MyRoot");
TVector<THolder<IEventHandle>> delayed;
- auto origObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ auto origObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) {
const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record;
const auto opType = record.GetTransaction(0).GetOperationType();
@@ -1591,18 +1578,18 @@ partitioning_settings {
return TTestActorRuntime::EEventAction::PROCESS;
});
- auto waitForDelayed = [&runtime, &delayed](ui32 size) {
+ auto waitForDelayed = [this, &delayed](ui32 size) {
if (delayed.size() != size) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&delayed, size](IEventHandle&) -> bool {
return delayed.size() == size;
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
};
const auto importId = ++txId;
- TestImport(runtime, importId, "/MyRoot", Sprintf(R"(
+ TestImport(Runtime(), importId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1611,12 +1598,12 @@ partitioning_settings {
destination_path: "/MyRoot/Restored"
}
}
- )", port));
+ )", S3Port()));
// wait for restore op
waitForDelayed(1);
const auto exportId = ++txId;
- TestExport(runtime, exportId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), exportId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1625,87 +1612,47 @@ partitioning_settings {
destination_prefix: "Backup2"
}
}
- )", port));
+ )", S3Port()));
// wait for copy table op
waitForDelayed(2);
- runtime.SetObserverFunc(origObserver);
+ Runtime().SetObserverFunc(origObserver);
for (auto& ev : delayed) {
- runtime.Send(ev.Release(), 0, true);
+ Runtime().Send(ev.Release(), 0, true);
}
- env.TestWaitNotification(runtime, importId);
- TestGetImport(runtime, importId, "/MyRoot");
- env.TestWaitNotification(runtime, exportId);
- TestGetExport(runtime, exportId, "/MyRoot");
+ Env().TestWaitNotification(Runtime(), importId);
+ TestGetImport(Runtime(), importId, "/MyRoot");
+ Env().TestWaitNotification(Runtime(), exportId);
+ TestGetExport(Runtime(), exportId, "/MyRoot");
}
- void ShouldCheckQuotas(const TSchemeLimits& limits, Ydb::StatusIds::StatusCode expectedFailStatus) {
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- const TString userSID = "user@builtin";
- TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().SystemBackupSIDs({userSID}));
-
- SetSchemeshardSchemaLimits(runtime, limits);
-
- const TVector<TString> tables = {
- R"(
- Name: "Table"
- Columns { Name: "key" Type: "Utf8" }
- Columns { Name: "value" Type: "Utf8" }
- KeyColumnNames: ["key"]
- )",
- };
- const TString request = Sprintf(R"(
- ExportToS3Settings {
- endpoint: "localhost:%d"
- scheme: HTTP
- items {
- source_path: "/MyRoot/Table"
- destination_prefix: ""
- }
- }
- )", port);
-
- Run(runtime, env, tables, request, expectedFailStatus);
- Run(runtime, env, tables, request, Ydb::StatusIds::SUCCESS, "/MyRoot", false, userSID);
+ Y_UNIT_TEST(ShouldCheckQuotasExportsLimited) {
+ ShouldCheckQuotas(TSchemeLimits{.MaxExports = 0}, Ydb::StatusIds::PRECONDITION_FAILED);
}
- Y_UNIT_TEST(ShouldCheckQuotas) {
- ShouldCheckQuotas(TSchemeLimits{.MaxExports = 0}, Ydb::StatusIds::PRECONDITION_FAILED);
+ Y_UNIT_TEST(ShouldCheckQuotasChildrenLimited) {
ShouldCheckQuotas(TSchemeLimits{.MaxChildrenInDir = 1}, Ydb::StatusIds::CANCELLED);
}
Y_UNIT_TEST(ShouldRetryAtFinalStage) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- UpdateRow(runtime, "Table", 1, "valueA");
- UpdateRow(runtime, "Table", 2, "valueB");
- runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
+ Env().TestWaitNotification(Runtime(), txId);
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ UpdateRow(Runtime(), "Table", 1, "valueA");
+ UpdateRow(Runtime(), "Table", 2, "valueB");
+ Runtime().SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG);
THolder<IEventHandle> injectResult;
- auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvDataShard::EvProposeTransaction: {
auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record;
@@ -1742,7 +1689,7 @@ partitioning_settings {
});
const auto exportId = ++txId;
- TestExport(runtime, txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1752,78 +1699,65 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
if (!injectResult) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&injectResult](IEventHandle&) -> bool {
return bool(injectResult);
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
- runtime.SetObserverFunc(prevObserver);
- runtime.Send(injectResult.Release(), 0, true);
+ Runtime().SetObserverFunc(prevObserver);
+ Runtime().Send(injectResult.Release(), 0, true);
- env.TestWaitNotification(runtime, exportId);
- TestGetExport(runtime, exportId, "/MyRoot");
+ Env().TestWaitNotification(Runtime(), exportId);
+ TestGetExport(Runtime(), exportId, "/MyRoot");
}
Y_UNIT_TEST(CorruptedDyNumber) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true));
+ EnvOptions().DisableStatsBatching(true);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "DyNumber" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
// Write bad DyNumber
- UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
+ UploadRow(Runtime(), "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
- ExportToS3Settings {
- endpoint: "localhost:%d"
- scheme: HTTP
- items {
- source_path: "/MyRoot/Table"
- destination_prefix: ""
- }
- }
- )", port));
- env.TestWaitNotification(runtime, txId);
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ }
+ )", S3Port()));
+ Env().TestWaitNotification(Runtime(), txId);
- TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+ TestGetExport(Runtime(), txId, "/MyRoot", Ydb::StatusIds::CANCELLED);
}
Y_UNIT_TEST(UidAsIdempotencyKey) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ Env().TestWaitNotification(Runtime(), txId);
const auto request = Sprintf(R"(
OperationParams {
@@ -1840,41 +1774,34 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port);
+ )", S3Port());
// create operation
- TestExport(runtime, ++txId, "/MyRoot", request);
+ TestExport(Runtime(), ++txId, "/MyRoot", request);
const ui64 exportId = txId;
// create operation again with same uid
- TestExport(runtime, ++txId, "/MyRoot", request);
+ TestExport(Runtime(), ++txId, "/MyRoot", request);
// new operation was not created
- TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
+ TestGetExport(Runtime(), txId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
// check previous operation
- TestGetExport(runtime, exportId, "/MyRoot");
- env.TestWaitNotification(runtime, exportId);
+ TestGetExport(Runtime(), exportId, "/MyRoot");
+ Env().TestWaitNotification(Runtime(), exportId);
}
Y_UNIT_TEST(ExportStartTime) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
- runtime.UpdateCurrentTime(TInstant::Now());
+ Env(); // Init test env
+ Runtime().UpdateCurrentTime(TInstant::Now());
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ Env().TestWaitNotification(Runtime(), txId);
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1883,9 +1810,9 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
- const auto desc = TestGetExport(runtime, txId, "/MyRoot");
+ const auto desc = TestGetExport(Runtime(), txId, "/MyRoot");
const auto& entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_PREPARING);
UNIT_ASSERT(entry.HasStartTime());
@@ -1893,26 +1820,19 @@ partitioning_settings {
}
Y_UNIT_TEST(CompletedExportEndTime) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
- runtime.UpdateCurrentTime(TInstant::Now());
+ Env(); // Init test env
+ Runtime().UpdateCurrentTime(TInstant::Now());
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1921,13 +1841,13 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
- runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing export
+ Runtime().AdvanceCurrentTime(TDuration::Seconds(30)); // doing export
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
- const auto desc = TestGetExport(runtime, txId, "/MyRoot");
+ const auto desc = TestGetExport(Runtime(), txId, "/MyRoot");
const auto& entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_DONE);
UNIT_ASSERT(entry.HasStartTime());
@@ -1936,18 +1856,17 @@ partitioning_settings {
}
Y_UNIT_TEST(CancelledExportEndTime) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
- runtime.UpdateCurrentTime(TInstant::Now());
+ Env(); // Init test env
+ Runtime().UpdateCurrentTime(TInstant::Now());
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
auto delayFunc = [](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) {
@@ -1959,7 +1878,7 @@ partitioning_settings {
};
THolder<IEventHandle> delayed;
- auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (delayFunc(ev)) {
delayed.Reset(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -1967,13 +1886,7 @@ partitioning_settings {
return TTestActorRuntime::EEventAction::PROCESS;
});
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -1982,32 +1895,32 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
const ui64 exportId = txId;
- runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing export
+ Runtime().AdvanceCurrentTime(TDuration::Seconds(30)); // doing export
if (!delayed) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
return bool(delayed);
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
- runtime.SetObserverFunc(prevObserver);
+ Runtime().SetObserverFunc(prevObserver);
- TestCancelExport(runtime, ++txId, "/MyRoot", exportId);
+ TestCancelExport(Runtime(), ++txId, "/MyRoot", exportId);
- auto desc = TestGetExport(runtime, exportId, "/MyRoot");
+ auto desc = TestGetExport(Runtime(), exportId, "/MyRoot");
auto entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLATION);
UNIT_ASSERT(entry.HasStartTime());
UNIT_ASSERT(!entry.HasEndTime());
- runtime.Send(delayed.Release(), 0, true);
- env.TestWaitNotification(runtime, exportId);
+ Runtime().Send(delayed.Release(), 0, true);
+ Env().TestWaitNotification(Runtime(), exportId);
- desc = TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+ desc = TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLED);
UNIT_ASSERT(entry.HasStartTime());
@@ -2017,50 +1930,41 @@ partitioning_settings {
// Based on CompletedExportEndTime
Y_UNIT_TEST(AuditCompletedExport) {
- TTestBasicRuntime runtime;
std::vector<std::string> auditLines;
- runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines));
-
- TTestEnv env(runtime);
-
- runtime.UpdateCurrentTime(TInstant::Now());
+ Runtime().AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines));
+ Env(); // Init test env
+ Runtime().UpdateCurrentTime(TInstant::Now());
ui64 txId = 100;
// Prepare table to export
//
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
// Start export
//
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
const auto request = Sprintf(R"(
OperationParams {
- labels {
- key: "uid"
- value: "foo"
- }
+ labels {
+ key: "uid"
+ value: "foo"
+ }
}
ExportToS3Settings {
- endpoint: "localhost:%d"
- scheme: HTTP
- items {
- source_path: "/MyRoot/Table"
- destination_prefix: ""
- }
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
}
- )", port);
- TestExport(runtime, ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876");
+ )", S3Port());
+ TestExport(Runtime(), ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876");
// Check audit record for export start
{
@@ -2081,11 +1985,11 @@ partitioning_settings {
// Do export
//
- runtime.AdvanceCurrentTime(TDuration::Seconds(30));
+ Runtime().AdvanceCurrentTime(TDuration::Seconds(30));
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
- const auto desc = TestGetExport(runtime, txId, "/MyRoot");
+ const auto desc = TestGetExport(Runtime(), txId, "/MyRoot");
const auto& entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_DONE);
UNIT_ASSERT(entry.HasStartTime());
@@ -2111,24 +2015,21 @@ partitioning_settings {
}
Y_UNIT_TEST(AuditCancelledExport) {
- TTestBasicRuntime runtime;
std::vector<std::string> auditLines;
- runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines));
-
- TTestEnv env(runtime);
-
- runtime.UpdateCurrentTime(TInstant::Now());
+ Runtime().AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines));
+ Env(); // Init test env
+ Runtime().UpdateCurrentTime(TInstant::Now());
ui64 txId = 100;
// Prepare table to export
//
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
auto delayFunc = [](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) {
@@ -2140,7 +2041,7 @@ partitioning_settings {
};
THolder<IEventHandle> delayed;
- auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ auto prevObserver = Runtime().SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (delayFunc(ev)) {
delayed.Reset(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
@@ -2150,18 +2051,12 @@ partitioning_settings {
// Start export
//
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
const auto request = Sprintf(R"(
OperationParams {
- labels {
- key: "uid"
- value: "foo"
- }
+ labels {
+ key: "uid"
+ value: "foo"
+ }
}
ExportToS3Settings {
endpoint: "localhost:%d"
@@ -2171,8 +2066,8 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port);
- TestExport(runtime, ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876");
+ )", S3Port());
+ TestExport(Runtime(), ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876");
const ui64 exportId = txId;
// Check audit record for export start
@@ -2194,31 +2089,31 @@ partitioning_settings {
// Do export (unsuccessfully)
//
- runtime.AdvanceCurrentTime(TDuration::Seconds(30));
+ Runtime().AdvanceCurrentTime(TDuration::Seconds(30));
if (!delayed) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
return bool(delayed);
});
- runtime.DispatchEvents(opts);
+ Runtime().DispatchEvents(opts);
}
- runtime.SetObserverFunc(prevObserver);
+ Runtime().SetObserverFunc(prevObserver);
// Cancel export mid-air
//
- TestCancelExport(runtime, ++txId, "/MyRoot", exportId);
+ TestCancelExport(Runtime(), ++txId, "/MyRoot", exportId);
- auto desc = TestGetExport(runtime, exportId, "/MyRoot");
+ auto desc = TestGetExport(Runtime(), exportId, "/MyRoot");
auto entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLATION);
UNIT_ASSERT(entry.HasStartTime());
UNIT_ASSERT(!entry.HasEndTime());
- runtime.Send(delayed.Release(), 0, true);
- env.TestWaitNotification(runtime, exportId);
+ Runtime().Send(delayed.Release(), 0, true);
+ Env().TestWaitNotification(Runtime(), exportId);
- desc = TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+ desc = TestGetExport(Runtime(), exportId, "/MyRoot", Ydb::StatusIds::CANCELLED);
entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLED);
UNIT_ASSERT(entry.HasStartTime());
@@ -2245,47 +2140,35 @@ partitioning_settings {
}
Y_UNIT_TEST(ExportPartitioningSettings) {
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
-
- Run(runtime, env, TVector<TString>{
+ Run(Runtime(), Env(), TVector<TString>{
R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
PartitionConfig {
- PartitioningPolicy {
- MinPartitionsCount: 10
- SplitByLoadSettings: {
- Enabled: true
- }
+ PartitioningPolicy {
+ MinPartitionsCount: 10
+ SplitByLoadSettings: {
+ Enabled: true
}
+ }
}
)"
},
- Sprintf(
- R"(
- ExportToS3Settings {
- endpoint: "localhost:%d"
- scheme: HTTP
- items {
- source_path: "/MyRoot/Table"
- destination_prefix: ""
- }
- }
- )",
- port
- )
+ Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ }
+ )", S3Port())
);
- auto* scheme = s3Mock.GetData().FindPtr("/scheme.pb");
+ auto* scheme = S3Mock().GetData().FindPtr("/scheme.pb");
UNIT_ASSERT(scheme);
CheckTableScheme(*scheme, GetPartitioningSettings, CreateProtoComparator(R"(
partitioning_by_size: DISABLED
@@ -2295,57 +2178,46 @@ partitioning_settings {
}
Y_UNIT_TEST(ExportIndexTablePartitioningSettings) {
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateIndexedTable(Runtime(), ++txId, "/MyRoot", R"(
TableDescription {
- Name: "Table"
- Columns { Name: "key" Type: "Uint32" }
- Columns { Name: "value" Type: "Utf8" }
- KeyColumnNames: ["key"]
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint32" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
}
IndexDescription {
- Name: "ByValue"
- KeyColumnNames: ["value"]
- IndexImplTableDescriptions: [ {
- PartitionConfig {
- PartitioningPolicy {
- MinPartitionsCount: 10
- SplitByLoadSettings: {
- Enabled: true
- }
- }
+ Name: "ByValue"
+ KeyColumnNames: ["value"]
+ IndexImplTableDescriptions: [ {
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 10
+ SplitByLoadSettings: {
+ Enabled: true
}
- } ]
+ }
+ }
+ } ]
}
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(
- R"(
- ExportToS3Settings {
- endpoint: "localhost:%d"
- scheme: HTTP
- items {
- source_path: "/MyRoot/Table"
- destination_prefix: ""
- }
- }
- )",
- port
- )
- );
- env.TestWaitNotification(runtime, txId);
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ }
+ )", S3Port()));
+ Env().TestWaitNotification(Runtime(), txId);
- auto* scheme = s3Mock.GetData().FindPtr("/scheme.pb");
+ auto* scheme = S3Mock().GetData().FindPtr("/scheme.pb");
UNIT_ASSERT(scheme);
CheckTableScheme(*scheme, GetIndexTablePartitioningSettings, CreateProtoComparator(R"(
partitioning_by_size: DISABLED
@@ -2355,23 +2227,16 @@ partitioning_settings {
}
Y_UNIT_TEST(UserSID) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ Env().TestWaitNotification(Runtime(), txId);
const TString request = Sprintf(R"(
ExportToS3Settings {
@@ -2382,41 +2247,35 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port);
+ )", S3Port());
const TString userSID = "user@builtin";
- TestExport(runtime, ++txId, "/MyRoot", request, userSID);
+ TestExport(Runtime(), ++txId, "/MyRoot", request, userSID);
- const auto desc = TestGetExport(runtime, txId, "/MyRoot");
+ const auto desc = TestGetExport(Runtime(), txId, "/MyRoot");
const auto& entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_PREPARING);
UNIT_ASSERT_VALUES_EQUAL(entry.GetUserSID(), userSID);
}
Y_UNIT_TEST(TablePermissions) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnablePermissionsExport(true));
+ EnvOptions().EnablePermissionsExport(true);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
NACLib::TDiffACL diffACL;
diffACL.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericUse, "user@builtin", NACLib::InheritNone);
- TestModifyACL(runtime, ++txId, "/MyRoot", "Table", diffACL.SerializeAsString(), "user@builtin");
- env.TestWaitNotification(runtime, txId);
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
+ TestModifyACL(Runtime(), ++txId, "/MyRoot", "Table", diffACL.SerializeAsString(), "user@builtin");
+ Env().TestWaitNotification(Runtime(), txId);
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -2425,46 +2284,40 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
- env.TestWaitNotification(runtime, txId);
+ )", S3Port()));
+ Env().TestWaitNotification(Runtime(), txId);
- auto* permissions = s3Mock.GetData().FindPtr("/permissions.pb");
+ auto* permissions = S3Mock().GetData().FindPtr("/permissions.pb");
UNIT_ASSERT(permissions);
CheckPermissions(*permissions, CreateProtoComparator(R"(
actions {
- change_owner: "user@builtin"
+ change_owner: "user@builtin"
}
actions {
- grant {
- subject: "user@builtin"
- permission_names: "ydb.generic.use"
- }
+ grant {
+ subject: "user@builtin"
+ permission_names: "ydb.generic.use"
+ }
}
)"));
}
Y_UNIT_TEST(Checksums) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnablePermissionsExport(true).EnableChecksumsExport(true));
+ EnvOptions().EnablePermissionsExport(true).EnableChecksumsExport(true);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
+ Env().TestWaitNotification(Runtime(), txId);
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ UploadRow(Runtime(), "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -2473,59 +2326,53 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
- env.TestWaitNotification(runtime, txId);
+ )", S3Port()));
+ Env().TestWaitNotification(Runtime(), txId);
- UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8);
- const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256");
+ UNIT_ASSERT_VALUES_EQUAL(S3Mock().GetData().size(), 8);
+ const auto* dataChecksum = S3Mock().GetData().FindPtr("/data_00.csv.sha256");
UNIT_ASSERT(dataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv");
- const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256");
+ const auto* metadataChecksum = S3Mock().GetData().FindPtr("/metadata.json.sha256");
UNIT_ASSERT(metadataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json");
- const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256");
+ const auto* schemeChecksum = S3Mock().GetData().FindPtr("/scheme.pb.sha256");
UNIT_ASSERT(schemeChecksum);
UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb");
- const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256");
+ const auto* permissionsChecksum = S3Mock().GetData().FindPtr("/permissions.pb.sha256");
UNIT_ASSERT(permissionsChecksum);
UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb");
}
Y_UNIT_TEST(EnableChecksumsPersistance) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true));
+ EnvOptions().EnableChecksumsExport(true);
+ Env(); // Init test env
ui64 txId = 100;
// Create test table
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
// Add some test data
- UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ UploadRow(Runtime(), "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
// Block sending backup task to datashards
- TBlockEvents<TEvDataShard::TEvProposeTransaction> block(runtime, [](auto& ev) {
+ TBlockEvents<TEvDataShard::TEvProposeTransaction> block(Runtime(), [](auto& ev) {
NKikimrTxDataShard::TFlatSchemeTransaction schemeTx;
UNIT_ASSERT(schemeTx.ParseFromString(ev.Get()->Get()->GetTxBody()));
return schemeTx.HasBackup();
});
// Start export and expect it to be blocked
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -2534,57 +2381,51 @@ partitioning_settings {
destination_prefix: ""
}
}
- )", port));
+ )", S3Port()));
- runtime.WaitFor("backup task is sent to datashards", [&]{ return block.size() >= 1; });
+ Runtime().WaitFor("backup task is sent to datashards", [&]{ return block.size() >= 1; });
// Stop blocking new events
block.Stop();
// Reboot SchemeShard to resend backup task
- RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
+ RebootTablet(Runtime(), TTestTxConfig::SchemeShard, Runtime().AllocateEdgeActor());
// Wait for export to complete
- env.TestWaitNotification(runtime, txId);
+ Env().TestWaitNotification(Runtime(), txId);
// Verify checksums are created
- UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 6);
+ UNIT_ASSERT_VALUES_EQUAL(S3Mock().GetData().size(), 6);
- const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256");
+ const auto* dataChecksum = S3Mock().GetData().FindPtr("/data_00.csv.sha256");
UNIT_ASSERT(dataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv");
- const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256");
+ const auto* metadataChecksum = S3Mock().GetData().FindPtr("/metadata.json.sha256");
UNIT_ASSERT(metadataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json");
- const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256");
+ const auto* schemeChecksum = S3Mock().GetData().FindPtr("/scheme.pb.sha256");
UNIT_ASSERT(schemeChecksum);
UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb");
}
Y_UNIT_TEST(ChecksumsWithCompression) {
- TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true));
+ EnvOptions().EnableChecksumsExport(true);
+ Env(); // Init test env
ui64 txId = 100;
- TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ TestCreateTable(Runtime(), ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
- env.TestWaitNotification(runtime, txId);
-
- UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
+ Env().TestWaitNotification(Runtime(), txId);
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
+ UploadRow(Runtime(), "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
- TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TestExport(Runtime(), ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
@@ -2594,10 +2435,10 @@ partitioning_settings {
}
compression: "zstd"
}
- )", port));
- env.TestWaitNotification(runtime, txId);
+ )", S3Port()));
+ Env().TestWaitNotification(Runtime(), txId);
- const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256");
+ const auto* dataChecksum = S3Mock().GetData().FindPtr("/data_00.csv.sha256");
UNIT_ASSERT(dataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv");
}
@@ -2704,15 +2545,7 @@ attributes {
};
Y_UNIT_TEST(Changefeeds) {
- TTestBasicRuntime runtime;
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
- ChangefeedGenerator gen(3, s3Mock);
+ ChangefeedGenerator gen(3, S3Mock());
auto request = Sprintf(R"(
ExportToS3Settings {
@@ -2723,12 +2556,13 @@ attributes {
destination_prefix: ""
}
}
- )", port);
+ )", S3Port());
- TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true));
- runtime.GetAppData().FeatureFlags.SetEnableChangefeedsExport(true);
+ EnvOptions().EnableChecksumsExport(true);
+ Env(); // Init test env
+ Runtime().GetAppData().FeatureFlags.SetEnableChangefeedsExport(true);
- Run(runtime, env, TVector<TString>{
+ Run(Runtime(), Env(), TVector<TString>{
R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
@@ -2741,9 +2575,7 @@ attributes {
}
Y_UNIT_TEST(SchemaMapping) {
- TTestBasicRuntime runtime;
-
- RunS3(runtime, {
+ RunS3({
R"(
Name: "Table1"
Columns { Name: "key" Type: "Utf8" }
@@ -2780,9 +2612,7 @@ attributes {
}
Y_UNIT_TEST(SchemaMappingEncryption) {
- TTestBasicRuntime runtime;
-
- RunS3(runtime, {
+ RunS3({
R"(
Name: "Table1"
Columns { Name: "key" Type: "Utf8" }
@@ -2824,9 +2654,7 @@ attributes {
}
Y_UNIT_TEST(SchemaMappingEncryptionIncorrectKey) {
- TTestBasicRuntime runtime;
-
- RunS3(runtime, {
+ RunS3({
R"(
Name: "Table1"
Columns { Name: "key" Type: "Utf8" }
@@ -2862,9 +2690,7 @@ attributes {
}
Y_UNIT_TEST(EncryptedExport) {
- TTestBasicRuntime runtime;
-
- RunS3(runtime, {
+ RunS3({
R"(
Name: "Table1"
Columns { Name: "key" Type: "Uint32" }
@@ -2899,15 +2725,17 @@ attributes {
}
)");
- UNIT_ASSERT(HasS3File("/my_export/metadata.json"));
- UNIT_ASSERT(HasS3File("/my_export/SchemaMapping/metadata.json.enc"));
- UNIT_ASSERT(HasS3File("/my_export/SchemaMapping/mapping.json.enc"));
- UNIT_ASSERT(HasS3File("/my_export/001/scheme.pb.enc"));
- UNIT_ASSERT(HasS3File("/my_export/001/data_00.csv.enc"));
- UNIT_ASSERT(HasS3File("/my_export/001/data_01.csv.enc"));
- UNIT_ASSERT(HasS3File("/my_export/002/scheme.pb.enc"));
- UNIT_ASSERT(HasS3File("/my_export/002/data_00.csv.enc"));
- UNIT_ASSERT(HasS3File("/my_export/002/data_01.csv.enc"));
+ CheckHasAllS3Files({
+ "/my_export/metadata.json",
+ "/my_export/SchemaMapping/metadata.json.enc",
+ "/my_export/SchemaMapping/mapping.json.enc",
+ "/my_export/001/scheme.pb.enc",
+ "/my_export/001/data_00.csv.enc",
+ "/my_export/001/data_01.csv.enc",
+ "/my_export/002/scheme.pb.enc",
+ "/my_export/002/data_00.csv.enc",
+ "/my_export/002/data_01.csv.enc",
+ });
THashSet<TString> ivs;
for (auto [key, content] : S3Mock().GetData()) {
@@ -2932,15 +2760,6 @@ attributes {
}
Y_UNIT_TEST(AutoDropping) {
- TTestBasicRuntime runtime;
-
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
-
- TS3Mock s3Mock({}, TS3Mock::TSettings(port));
- UNIT_ASSERT(s3Mock.Start());
-
-
auto request = Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
@@ -2950,20 +2769,18 @@ attributes {
destination_prefix: ""
}
}
- )", port);
+ )", S3Port());
- TTestEnv env(runtime);
-
- Run(runtime, env, TVector<TString>{
+ Run(Runtime(), Env(), TVector<TString>{
R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
- Columns { Name: "value" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)",
}, request, Ydb::StatusIds::SUCCESS, "/MyRoot");
- auto desc = DescribePath(runtime, "/MyRoot");
+ auto desc = DescribePath(Runtime(), "/MyRoot");
UNIT_ASSERT_EQUAL(desc.GetPathDescription().ChildrenSize(), 1);
UNIT_ASSERT_EQUAL(desc.GetPathDescription().GetChildren(0).GetName(), "Table");
}