aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-11-13 10:39:42 +0300
committergvit <gvit@ydb.tech>2022-11-13 10:39:42 +0300
commit277cd4ec47419923e9cfabd3ea4726437cbfa53c (patch)
tree2e3a92e0da31b0046a31789688f63f17b16007b9
parent96f4cf9c56064bacc3b2aa87f0c52a2bcf622135 (diff)
downloadydb-277cd4ec47419923e9cfabd3ea4726437cbfa53c.tar.gz
switch test to public api
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp107
1 files changed, 48 insertions, 59 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 7ab261310c4..91525fd1b19 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -2273,6 +2273,8 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) {
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
TString sessionId = CreateSessionRPC(runtime);
+ TString sessionId3 = CreateSessionRPC(runtime);
+ TString sessionId4 = CreateSessionRPC(runtime);
TString txId;
{
@@ -2342,14 +2344,13 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) {
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u);
// Send some more requests that form a staircase, they would all be blocked as well
- auto sender3 = runtime.AllocateEdgeActor();
- SendRequest(runtime, sender3, MakeSimpleRequest(Q_(R"(
+ auto f3 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3), (5, 3);
- UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 3), (6, 3))")));
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 3), (6, 3))"), sessionId3, "", true));
SimulateSleep(server, TDuration::Seconds(1));
- SendRequest(runtime, sender3, MakeSimpleRequest(Q_(R"(
+ auto f4 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 4), (7, 4);
- UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 4), (8, 4))")));
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 4), (8, 4))"), sessionId4, "", true));
SimulateSleep(server, TDuration::Seconds(1));
// One more request that would be executed out of order
@@ -2359,13 +2360,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) {
// Select key 7, we expect a timeout, because logically a write to it already happened
{
- auto sender4 = runtime.AllocateEdgeActor();
- auto req = MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"));
- req->Record.MutableRequest()->SetCancelAfterMs(1000);
- req->Record.MutableRequest()->SetTimeoutMs(1000);
- auto ev = ExecRequest(runtime, sender4, std::move(req));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::TIMEOUT);
+ auto sender4 = CreateSessionRPC(runtime);
+ auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"), sender4, "", true);
+ req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1);
+ req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1);
+ auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req)));
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT);
}
// Reboot table-1 tablet
@@ -2401,13 +2401,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) {
// Select key 7 again, we still expect a timeout, because logically a write to it already happened
{
- auto sender5 = runtime.AllocateEdgeActor();
- auto req = MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"));
- req->Record.MutableRequest()->SetCancelAfterMs(1000);
- req->Record.MutableRequest()->SetTimeoutMs(1000);
- auto ev = ExecRequest(runtime, sender5, std::move(req));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::TIMEOUT);
+ auto sender5 = CreateSessionRPC(runtime);
+ auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"), sender5, "", true);
+ req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1);
+ req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1);
+ auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req)));
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT);
}
// Stop blocking readsets and unblock progress
@@ -2421,14 +2420,8 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc) {
// Select key 7 again, this time is should succeed
{
- auto sender6 = runtime.AllocateEdgeActor();
- auto req = MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"));
- auto ev = ExecRequest(runtime, sender6, std::move(req));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- TString expected = "Struct { List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 4 } } } } Struct { Bool: false }";
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected);
+ auto result = KqpSimpleExec(runtime, Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"));
+ UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 7 } items { uint32_value: 4 } }");
}
}
@@ -2455,6 +2448,7 @@ Y_UNIT_TEST_TWIN(TestCopyTableNoDeadlock, UseMvcc) {
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
TString sessionId = CreateSessionRPC(runtime);
+ TString sessionRead = CreateSessionRPC(runtime);
TString txId;
{
@@ -2520,11 +2514,10 @@ Y_UNIT_TEST_TWIN(TestCopyTableNoDeadlock, UseMvcc) {
captureTxProposes = true;
// Now we send a distributed read, while stopping coordinator proposals
- auto senderRead = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderRead, MakeSimpleRequest(Q_(R"(
+ auto fRead = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
SELECT * FROM `/Root/table-1`
UNION ALL
- SELECT * FROM `/Root/table-2`)")));
+ SELECT * FROM `/Root/table-2`)"), sessionRead, "", true));
// Wait until we capture the propose request
if (txProposes.size() < 1) {
@@ -2608,9 +2601,8 @@ Y_UNIT_TEST_TWIN(TestCopyTableNoDeadlock, UseMvcc) {
// Wait for distributed read to complete, it must succeed
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderRead);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto response = AwaitResponse(runtime, fRead);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
}
@@ -2635,6 +2627,9 @@ Y_UNIT_TEST(TestPlannedCancelSplit) {
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
+ TString senderRead2 = CreateSessionRPC(runtime);
+ TString senderRead1 = CreateSessionRPC(runtime);
+
auto shards1 = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
auto shards2 = GetTableShards(server, sender, "/Root/table-2");
@@ -2682,11 +2677,10 @@ Y_UNIT_TEST(TestPlannedCancelSplit) {
// Send a distributed read while capturing propose results
captureTxProposeResult = true;
- auto senderRead1 = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderRead1, MakeSimpleRequest(Q_(R"(
+ auto fRead1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
SELECT * FROM `/Root/table-1`
UNION ALL
- SELECT * FROM `/Root/table-2`)")));
+ SELECT * FROM `/Root/table-2`)"), senderRead1, "", true));
if (txProposeResults.size() < 2) {
TDispatchOptions options;
options.FinalEvents.emplace_back(
@@ -2715,18 +2709,16 @@ Y_UNIT_TEST(TestPlannedCancelSplit) {
// Wait for the first query result, it must succeed
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderRead1);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto response = AwaitResponse(runtime, fRead1);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
// Send a distributed read again, while blocking propose messages
captureTxPropose = true;
- auto senderRead2 = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderRead2, MakeSimpleRequest(Q_(R"(
+ auto fRead2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
SELECT * FROM `/Root/table-1`
UNION ALL
- SELECT * FROM `/Root/table-2`)")));
+ SELECT * FROM `/Root/table-2`)"), senderRead2, "", true));
if (txProposes.size() < 2) {
TDispatchOptions options;
options.FinalEvents.emplace_back(
@@ -2778,9 +2770,8 @@ Y_UNIT_TEST(TestPlannedCancelSplit) {
// Wait for query to return an error
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderRead2);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::OVERLOADED);
+ auto response = AwaitResponse(runtime, fRead2);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::OVERLOADED);
}
// Sleep a little so in case of a bug transaction is left in WaitForPlan state
@@ -2819,6 +2810,7 @@ Y_UNIT_TEST_TWIN(TestPlannedTimeoutSplit, UseMvcc) {
CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ TString senderWrite1 = CreateSessionRPC(runtime);
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
@@ -2845,11 +2837,10 @@ Y_UNIT_TEST_TWIN(TestPlannedTimeoutSplit, UseMvcc) {
auto prevObserverFunc = runtime.SetObserverFunc(captureMessages);
// Send a distributed write while capturing coordinator propose
- auto senderWrite1 = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderWrite1, MakeSimpleRequest(Q_(R"(
+ auto fWrite1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (101, 101);
UPSERT INTO `/Root/table-2` (key, value) VALUES (202, 202);
- )")));
+ )"), senderWrite1, "", true));
if (txProposes.size() < 1) {
TDispatchOptions options;
options.FinalEvents.emplace_back(
@@ -2914,9 +2905,8 @@ Y_UNIT_TEST_TWIN(TestPlannedTimeoutSplit, UseMvcc) {
// Wait for query to return an error
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWrite1);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::UNAVAILABLE);
+ auto response = AwaitResponse(runtime, fWrite1);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::UNAVAILABLE);
}
}
@@ -2938,6 +2928,7 @@ Y_UNIT_TEST_TWIN(TestPlannedHalfOverloadedSplit, UseMvcc) {
CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ TString senderWrite1 = CreateSessionRPC(runtime);
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
@@ -2981,11 +2972,10 @@ Y_UNIT_TEST_TWIN(TestPlannedHalfOverloadedSplit, UseMvcc) {
auto prevObserverFunc = runtime.SetObserverFunc(captureMessages);
// Send a distributed write while capturing coordinator propose
- auto senderWrite1 = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderWrite1, MakeSimpleRequest(Q_(R"(
+ auto fWrite1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (101, 101);
UPSERT INTO `/Root/table-2` (key, value) VALUES (202, 202);
- )")));
+ )"), senderWrite1, "", true));
if (txProposes.size() < 1 || txProposeResults.size() < 1) {
TDispatchOptions options;
options.FinalEvents.emplace_back(
@@ -3053,12 +3043,11 @@ Y_UNIT_TEST_TWIN(TestPlannedHalfOverloadedSplit, UseMvcc) {
// Wait for query to return an error
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWrite1);
- auto& response = ev->Get()->Record.GetRef();
+ auto response = AwaitResponse(runtime, fWrite1);
UNIT_ASSERT_C(
- response.GetYdbStatus() == Ydb::StatusIds::OVERLOADED ||
- response.GetYdbStatus() == Ydb::StatusIds::UNAVAILABLE,
- "Status: " << response.GetYdbStatus());
+ response.operation().status() == Ydb::StatusIds::OVERLOADED ||
+ response.operation().status() == Ydb::StatusIds::UNAVAILABLE,
+ "Status: " << response.operation().status());
}
}