diff options
author | gvit <gvit@ydb.tech> | 2022-11-13 10:39:42 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-11-13 10:39:42 +0300 |
commit | 277cd4ec47419923e9cfabd3ea4726437cbfa53c (patch) | |
tree | 2e3a92e0da31b0046a31789688f63f17b16007b9 | |
parent | 96f4cf9c56064bacc3b2aa87f0c52a2bcf622135 (diff) | |
download | ydb-277cd4ec47419923e9cfabd3ea4726437cbfa53c.tar.gz |
switch test to public api
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 107 |
1 files changed, 48 insertions, 59 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 7ab261310c4..91525fd1b19 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -2273,6 +2273,8 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) { ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); TString sessionId = CreateSessionRPC(runtime); + TString sessionId3 = CreateSessionRPC(runtime); + TString sessionId4 = CreateSessionRPC(runtime); TString txId; { @@ -2342,14 +2344,13 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) { UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u); // Send some more requests that form a staircase, they would all be blocked as well - auto sender3 = runtime.AllocateEdgeActor(); - SendRequest(runtime, sender3, MakeSimpleRequest(Q_(R"( + auto f3 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3), (5, 3); - UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 3), (6, 3))"))); + UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 3), (6, 3))"), sessionId3, "", true)); SimulateSleep(server, TDuration::Seconds(1)); - SendRequest(runtime, sender3, MakeSimpleRequest(Q_(R"( + auto f4 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 4), (7, 4); - UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 4), (8, 4))"))); + UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 4), (8, 4))"), sessionId4, "", true)); SimulateSleep(server, TDuration::Seconds(1)); // One more request that would be executed out of order @@ -2359,13 +2360,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) { // Select key 7, we expect a timeout, because logically a write to it already happened { - auto sender4 = runtime.AllocateEdgeActor(); - auto req = MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;")); - req->Record.MutableRequest()->SetCancelAfterMs(1000); - req->Record.MutableRequest()->SetTimeoutMs(1000); - auto ev = ExecRequest(runtime, sender4, std::move(req)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::TIMEOUT); + auto sender4 = CreateSessionRPC(runtime); + auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"), sender4, "", true); + req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1); + req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1); + auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req))); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT); } // Reboot table-1 tablet @@ -2401,13 +2401,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) { // Select key 7 again, we still expect a timeout, because logically a write to it already happened { - auto sender5 = runtime.AllocateEdgeActor(); - auto req = MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;")); - req->Record.MutableRequest()->SetCancelAfterMs(1000); - req->Record.MutableRequest()->SetTimeoutMs(1000); - auto ev = ExecRequest(runtime, sender5, std::move(req)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::TIMEOUT); + auto sender5 = CreateSessionRPC(runtime); + auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"), sender5, "", true); + req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1); + req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1); + auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req))); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT); } // Stop blocking readsets and unblock progress @@ -2421,14 +2420,8 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) { // Select key 7 again, this time is should succeed { - auto sender6 = runtime.AllocateEdgeActor(); - auto req = MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;")); - auto ev = ExecRequest(runtime, sender6, std::move(req)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - TString expected = "Struct { List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 4 } } } } Struct { Bool: false }"; - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected); + auto result = KqpSimpleExec(runtime, Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;")); + UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 7 } items { uint32_value: 4 } }"); } } @@ -2455,6 +2448,7 @@ Y_UNIT_TEST_TWIN(TestCopyTableNoDeadlock, UseMvcc) { ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); TString sessionId = CreateSessionRPC(runtime); + TString sessionRead = CreateSessionRPC(runtime); TString txId; { @@ -2520,11 +2514,10 @@ Y_UNIT_TEST_TWIN(TestCopyTableNoDeadlock, UseMvcc) { captureTxProposes = true; // Now we send a distributed read, while stopping coordinator proposals - auto senderRead = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderRead, MakeSimpleRequest(Q_(R"( + auto fRead = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( SELECT * FROM `/Root/table-1` UNION ALL - SELECT * FROM `/Root/table-2`)"))); + SELECT * FROM `/Root/table-2`)"), sessionRead, "", true)); // Wait until we capture the propose request if (txProposes.size() < 1) { @@ -2608,9 +2601,8 @@ Y_UNIT_TEST_TWIN(TestCopyTableNoDeadlock, UseMvcc) { // Wait for distributed read to complete, it must succeed { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderRead); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto response = AwaitResponse(runtime, fRead); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } } @@ -2635,6 +2627,9 @@ Y_UNIT_TEST(TestPlannedCancelSplit) { ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); + TString senderRead2 = CreateSessionRPC(runtime); + TString senderRead1 = CreateSessionRPC(runtime); + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u); auto shards2 = GetTableShards(server, sender, "/Root/table-2"); @@ -2682,11 +2677,10 @@ Y_UNIT_TEST(TestPlannedCancelSplit) { // Send a distributed read while capturing propose results captureTxProposeResult = true; - auto senderRead1 = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderRead1, MakeSimpleRequest(Q_(R"( + auto fRead1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( SELECT * FROM `/Root/table-1` UNION ALL - SELECT * FROM `/Root/table-2`)"))); + SELECT * FROM `/Root/table-2`)"), senderRead1, "", true)); if (txProposeResults.size() < 2) { TDispatchOptions options; options.FinalEvents.emplace_back( @@ -2715,18 +2709,16 @@ Y_UNIT_TEST(TestPlannedCancelSplit) { // Wait for the first query result, it must succeed { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderRead1); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto response = AwaitResponse(runtime, fRead1); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } // Send a distributed read again, while blocking propose messages captureTxPropose = true; - auto senderRead2 = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderRead2, MakeSimpleRequest(Q_(R"( + auto fRead2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( SELECT * FROM `/Root/table-1` UNION ALL - SELECT * FROM `/Root/table-2`)"))); + SELECT * FROM `/Root/table-2`)"), senderRead2, "", true)); if (txProposes.size() < 2) { TDispatchOptions options; options.FinalEvents.emplace_back( @@ -2778,9 +2770,8 @@ Y_UNIT_TEST(TestPlannedCancelSplit) { // Wait for query to return an error { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderRead2); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::OVERLOADED); + auto response = AwaitResponse(runtime, fRead2); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::OVERLOADED); } // Sleep a little so in case of a bug transaction is left in WaitForPlan state @@ -2819,6 +2810,7 @@ Y_UNIT_TEST_TWIN(TestPlannedTimeoutSplit, UseMvcc) { CreateShardedTable(server, sender, "/Root", "table-1", 1); CreateShardedTable(server, sender, "/Root", "table-2", 1); + TString senderWrite1 = CreateSessionRPC(runtime); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); @@ -2845,11 +2837,10 @@ Y_UNIT_TEST_TWIN(TestPlannedTimeoutSplit, UseMvcc) { auto prevObserverFunc = runtime.SetObserverFunc(captureMessages); // Send a distributed write while capturing coordinator propose - auto senderWrite1 = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderWrite1, MakeSimpleRequest(Q_(R"( + auto fWrite1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (101, 101); UPSERT INTO `/Root/table-2` (key, value) VALUES (202, 202); - )"))); + )"), senderWrite1, "", true)); if (txProposes.size() < 1) { TDispatchOptions options; options.FinalEvents.emplace_back( @@ -2914,9 +2905,8 @@ Y_UNIT_TEST_TWIN(TestPlannedTimeoutSplit, UseMvcc) { // Wait for query to return an error { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWrite1); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::UNAVAILABLE); + auto response = AwaitResponse(runtime, fWrite1); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::UNAVAILABLE); } } @@ -2938,6 +2928,7 @@ Y_UNIT_TEST_TWIN(TestPlannedHalfOverloadedSplit, UseMvcc) { CreateShardedTable(server, sender, "/Root", "table-1", 1); CreateShardedTable(server, sender, "/Root", "table-2", 1); + TString senderWrite1 = CreateSessionRPC(runtime); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); @@ -2981,11 +2972,10 @@ Y_UNIT_TEST_TWIN(TestPlannedHalfOverloadedSplit, UseMvcc) { auto prevObserverFunc = runtime.SetObserverFunc(captureMessages); // Send a distributed write while capturing coordinator propose - auto senderWrite1 = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderWrite1, MakeSimpleRequest(Q_(R"( + auto fWrite1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (101, 101); UPSERT INTO `/Root/table-2` (key, value) VALUES (202, 202); - )"))); + )"), senderWrite1, "", true)); if (txProposes.size() < 1 || txProposeResults.size() < 1) { TDispatchOptions options; options.FinalEvents.emplace_back( @@ -3053,12 +3043,11 @@ Y_UNIT_TEST_TWIN(TestPlannedHalfOverloadedSplit, UseMvcc) { // Wait for query to return an error { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWrite1); - auto& response = ev->Get()->Record.GetRef(); + auto response = AwaitResponse(runtime, fWrite1); UNIT_ASSERT_C( - response.GetYdbStatus() == Ydb::StatusIds::OVERLOADED || - response.GetYdbStatus() == Ydb::StatusIds::UNAVAILABLE, - "Status: " << response.GetYdbStatus()); + response.operation().status() == Ydb::StatusIds::OVERLOADED || + response.operation().status() == Ydb::StatusIds::UNAVAILABLE, + "Status: " << response.operation().status()); } } |