diff options
author | gvit <gvit@ydb.tech> | 2022-11-13 23:24:51 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-11-13 23:24:51 +0300 |
commit | 1d1f495e76f87ee31598b60ea254edce5e6c78f5 (patch) | |
tree | 76a291ea0ae7ea76602e99df6e3cbb3bf2b2ce29 | |
parent | b10c413747afc8d9e64c182ac9dbbeebcf767eab (diff) | |
download | ydb-1d1f495e76f87ee31598b60ea254edce5e6c78f5.tar.gz |
switch test to public api
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common_kqp.h | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 344 |
2 files changed, 154 insertions, 206 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 68dd97a9fb2..15b89213cac 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -79,22 +79,6 @@ namespace NKqpHelpers { return request; } - inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSimpleRequest( - const TString& sql, - const TString& database = {}) - { - auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - request->Record.MutableRequest()->SetQuery(sql); - if (!database.empty()) { - request->Record.MutableRequest()->SetDatabase(database); - } - return request; - } - inline void SendRequest( TTestActorRuntime& runtime, TActorId sender, diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 91525fd1b19..3cc39c176e5 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -3094,6 +3094,8 @@ Y_UNIT_TEST(TestReadTableWriteConflict) { // NOTE: table-1 has 2 shards so ReadTable is not immediate CreateShardedTable(server, sender, "/Root", "table-1", 2); CreateShardedTable(server, sender, "/Root", "table-2", 1); + TString senderWriteImm = CreateSessionRPC(runtime); + TString senderWriteDist = CreateSessionRPC(runtime); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); @@ -3171,15 +3173,13 @@ Y_UNIT_TEST(TestReadTableWriteConflict) { // Start an immediate write to table-1, it won't be able to start // because it arrived after the read table and they block each other - auto senderWriteImm = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderWriteImm, MakeSimpleRequest(Q_( - "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3)"))); + auto fWriteImm = SendRequest(runtime, MakeSimpleRequestRPC(Q_( + "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3)"), senderWriteImm, "", true)); // Start a planned write to both tables, wait for its plan step too - auto senderWriteDist = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderWriteDist, MakeSimpleRequest(Q_( + auto fWriteDist = SendRequest(runtime, MakeSimpleRequestRPC(Q_( "UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 4); " - "UPSERT INTO `/Root/table-2` (key, value) VALUES (8, 4)"))); + "UPSERT INTO `/Root/table-2` (key, value) VALUES (8, 4)"), senderWriteDist, "", true)); if (seenPlanSteps < 2) { TDispatchOptions options; options.FinalEvents.emplace_back( @@ -3208,16 +3208,14 @@ Y_UNIT_TEST(TestReadTableWriteConflict) { // Wait for immediate write to succeed { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWriteImm); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto response = AwaitResponse(runtime, fWriteImm); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } // Wait for distributed write to succeed { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWriteDist); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto response = AwaitResponse(runtime, fWriteDist); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } } @@ -3243,6 +3241,8 @@ Y_UNIT_TEST(TestReadTableImmediateWriteBlock) { // NOTE: table-1 has 2 shards so ReadTable is not immediate CreateShardedTable(server, sender, "/Root", "table-1", 2); + TString senderWriteImm = CreateSessionRPC(runtime); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); // Capture and block all readset messages @@ -3282,15 +3282,13 @@ Y_UNIT_TEST(TestReadTableImmediateWriteBlock) { SimulateSleep(server, TDuration::Seconds(1)); // Start an immediate write to table-1, it should be able to complete - auto senderWriteImm = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderWriteImm, MakeSimpleRequest(Q_( - "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3)"))); + auto fWriteImm = SendRequest(runtime, MakeSimpleRequestRPC(Q_( + "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3)"), senderWriteImm, "", true)); // Wait for immediate write to succeed { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWriteImm); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto response = AwaitResponse(runtime, fWriteImm); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } } @@ -3574,45 +3572,53 @@ void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query, bool }; auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + std::function<void()> processEvents = [&]() { + // Wait until there's exactly one propose message at our datashard + if (eventsPropose.size() < 1) { + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return eventsPropose.size() >= 1; + }; + runtime.DispatchEvents(options); + } + UNIT_ASSERT_VALUES_EQUAL(eventsPropose.size(), 1u); + Cerr << "--- captured scan tx proposal" << Endl; + capturePropose = false; + + // Drop column value2 and wait for drop to finish + auto dropTxId = AsyncAlterDropColumn(server, "/Root", "table-1", "value2"); + WaitTxNotification(server, dropTxId); + + // Resend delayed propose messages + Cerr << "--- resending captured proposals" << Endl; + for (auto& ev : eventsPropose) { + runtime.Send(ev.Release(), 0, true); + } + eventsPropose.clear(); + return; + }; + if (dataQuery) { Cerr << "--- sending data query request" << Endl; - SendRequest(runtime, streamSender, MakeSimpleRequest(query)); + auto tmp = CreateSessionRPC(runtime); + auto f = SendRequest(runtime, MakeSimpleRequestRPC(query, tmp, "", true)); + processEvents(); + auto response = AwaitResponse(runtime, f); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::ABORTED); } else { Cerr << "--- sending stream request" << Endl; SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, query, false)); - } - - // Wait until there's exactly one propose message at our datashard - if (eventsPropose.size() < 1) { - TDispatchOptions options; - options.CustomFinalCondition = [&]() { - return eventsPropose.size() >= 1; - }; - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(eventsPropose.size(), 1u); - Cerr << "--- captured scan tx proposal" << Endl; - capturePropose = false; - - // Drop column value2 and wait for drop to finish - auto dropTxId = AsyncAlterDropColumn(server, "/Root", "table-1", "value2"); - WaitTxNotification(server, dropTxId); + processEvents(); - // Resend delayed propose messages - Cerr << "--- resending captured proposals" << Endl; - for (auto& ev : eventsPropose) { - runtime.Send(ev.Release(), 0, true); + Cerr << "--- waiting for result" << Endl; + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender); + auto& response = ev->Get()->Record.GetRef(); + Cerr << response.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::ABORTED); + auto& issue = response.GetResponse().GetQueryIssues(0); + UNIT_ASSERT_VALUES_EQUAL(issue.issue_code(), (int) NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH); + UNIT_ASSERT_STRINGS_EQUAL(issue.message(), "Table \'/Root/table-1\' scheme changed."); } - eventsPropose.clear(); - - Cerr << "--- waiting for result" << Endl; - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender); - auto& response = ev->Get()->Record.GetRef(); - Cerr << response.DebugString() << Endl; - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::ABORTED); - auto& issue = response.GetResponse().GetQueryIssues(0); - UNIT_ASSERT_VALUES_EQUAL(issue.issue_code(), (int) NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH); - UNIT_ASSERT_STRINGS_EQUAL(issue.message(), "Table \'/Root/table-1\' scheme changed."); } Y_UNIT_TEST_WITH_MVCC(TestLateKqpScanAfterColumnDrop) { @@ -3643,6 +3649,7 @@ Y_UNIT_TEST(MvccTestSnapshotRead) { CreateShardedTable(server, sender, "/Root", "table-1", 1); + TString senderSession = CreateSessionRPC(runtime); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0), (1, 1), (2, 2), (3, 3);")); auto waitFor = [&](const auto& condition, const TString& description) { @@ -3705,7 +3712,9 @@ Y_UNIT_TEST(MvccTestSnapshotRead) { // future snapshot snapshot = TRowVersion(lastStep + 1000, Max<ui64>()); - SendRequest(runtime, sender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key"))); + auto f = SendRequest( + runtime, + MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key"), senderSession, "", true)); waitFor([&]{ return rewritten; }, "EvProposeTransaction rewritten"); @@ -3714,12 +3723,11 @@ Y_UNIT_TEST(MvccTestSnapshotRead) { waitFor([&]{ return rescheduled; }, "EvProposeTransaction rescheduled"); { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - TString expected = "Struct { List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } } Struct { Bool: false }"; - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected); + auto response = AwaitResponse(runtime, f); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + Ydb::Table::ExecuteQueryResult result; + response.operation().result().UnpackTo(&result); + UNIT_ASSERT_VALUES_EQUAL(FormatResult(result), "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); } auto tmp = std::exchange(snapshot, TRowVersion::Min()); @@ -3729,24 +3737,16 @@ Y_UNIT_TEST(MvccTestSnapshotRead) { ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 10);")); { - auto ev = ExecRequest(runtime, sender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key"))); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - TString expected = "Struct { List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 10 } } } } Struct { Bool: false }"; - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected); + auto result = KqpSimpleExec(runtime, Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key")); + UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 2 } items { uint32_value: 10 } }"); } snapshot = tmp; rescheduled = false; { - auto ev = ExecRequest(runtime, sender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key"))); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - TString expected = "Struct { List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } } Struct { Bool: false }"; - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected); + auto result = KqpSimpleExec(runtime, Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key")); + UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); } UNIT_ASSERT(!rescheduled); @@ -3858,6 +3858,7 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, UseMvcc) { ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); TString sessionId = CreateSessionRPC(runtime); + TString sender3Session = CreateSessionRPC(runtime); TString txId; { @@ -3912,10 +3913,9 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, UseMvcc) { // Now send an upsert to table-1, it should be blocked by our in-progress tx delayedProposeCount = 0; - auto sender3 = runtime.AllocateEdgeActor(); Cerr << "... sending immediate upsert" << Endl; - SendRequest(runtime, sender3, MakeSimpleRequest(Q_(R"( - UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 42), (3, 51))"))); + auto f3 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 42), (3, 51))"), sender3Session, "", true)); // Wait unti that propose starts to execute waitFor([&]{ return delayedProposeCount >= 1; }, "immediate propose"); @@ -3928,22 +3928,15 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, UseMvcc) { // The result of immediate upsert must be neither SUCCESS nor UNDETERMINED { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender3); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_UNEQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_UNEQUAL(response.GetYdbStatus(), Ydb::StatusIds::UNDETERMINED); + auto response = AwaitResponse(runtime, f3); + UNIT_ASSERT_VALUES_UNEQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_UNEQUAL(response.operation().status(), Ydb::StatusIds::UNDETERMINED); } // Select key 1 and verify its value was not updated { - auto sender4 = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, sender4, MakeSimpleRequest(Q_(R"( - SELECT key, value FROM `/Root/table-1` WHERE key = 1 ORDER BY key)"))); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - TString expected = "Struct { List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } } Struct { Bool: false }"; - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected); + auto result = KqpSimpleExec(runtime, Q_(R"(SELECT key, value FROM `/Root/table-1` WHERE key = 1 ORDER BY key)")); + UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); } } @@ -4029,14 +4022,8 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, UseMvcc) { // Select key 3 and verify its value was updated { - auto sender4 = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, sender4, MakeSimpleRequest(Q_(R"( - SELECT key, value FROM `/Root/table-1` WHERE key = 3 ORDER BY key)"))); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - TString expected = "Struct { List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 2 } } } } Struct { Bool: false }"; - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected); + auto result = KqpSimpleExec(runtime, Q_(R"(SELECT key, value FROM `/Root/table-1` WHERE key = 3 ORDER BY key)")); + UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 3 } items { uint32_value: 2 } }"); } } @@ -4116,6 +4103,9 @@ Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) { TString sessionId1 = CreateSessionRPC(runtime, "/Root"); TString sessionId2 = CreateSessionRPC(runtime, "/Root"); + TString sq1 = CreateSessionRPC(runtime, "/Root"); + TString sq2 = CreateSessionRPC(runtime, "/Root"); + auto f1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( SELECT * FROM `/Root/table-1` UNION ALL @@ -4166,8 +4156,8 @@ Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) { // Start blocking commits again and try performing new writes prevObserver = runtime.SetObserverFunc(blockCommits); - SendRequest(runtime, sender, MakeSimpleRequest(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"), "/Root")); - SendRequest(runtime, sender, MakeSimpleRequest(Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 4)"), "/Root")); + SendRequest(runtime, MakeSimpleRequestRPC(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"), sq1, "", true), "/Root"); + SendRequest(runtime, MakeSimpleRequestRPC(Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 4)"), sq2, "", true), "/Root"); waitFor([&]{ return blockedCommits.size() >= 2; }, "at least 2 blocked commits"); // Send an additional read request, it must not be blocked @@ -4491,6 +4481,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { CreateShardedTable(server, sender, "/Root", "table-1", 1); CreateShardedTable(server, sender, "/Root", "table-2", 1); + TString senderImmediateWrite = CreateSessionRPC(runtime); + auto table1shards = GetTableShards(server, sender, "/Root/table-1"); auto table2shards = GetTableShards(server, sender, "/Root/table-2"); @@ -4503,12 +4495,7 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)")); auto execSimpleRequest = [&](const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleExec(runtime, query); }; auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { @@ -4532,10 +4519,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); // Same when using a fresh snapshot read UNIT_ASSERT_VALUES_EQUAL( @@ -4556,10 +4541,9 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { } // Send an immediate write transaction, but don't wait for result - auto senderImmediateWrite = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"( + auto fImmWrite = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5) - )"))); + )"), senderImmediateWrite, "", true)); // We sleep for very little so datashard commits changes, but doesn't advance SimulateSleep(runtime, TDuration::MicroSeconds(1)); @@ -4570,10 +4554,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); // Same when using a fresh snapshot read UNIT_ASSERT_VALUES_EQUAL( @@ -4586,9 +4568,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { // Wait for the write to finish { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto response = AwaitResponse(runtime, fImmWrite); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } // Perform an immediate read again, it should observe the write above @@ -4597,11 +4578,9 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 5 } }"); // Same when using a fresh snapshot read UNIT_ASSERT_VALUES_EQUAL( @@ -4614,9 +4593,10 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { "{ items { uint32_value: 5 } items { uint32_value: 5 } }"); // Start a new write and sleep again - SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"( + fImmWrite = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 7) - )"))); + )"), senderImmediateWrite, "", true)); + SimulateSleep(runtime, TDuration::MicroSeconds(1)); // Verify this write is not observed yet @@ -4625,11 +4605,9 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 5 } }"); // Spam schedules in the runtime to prevent mediator time jumping prematurely { @@ -4649,17 +4627,16 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 5 } }"); // Send one more write and sleep again - auto senderImmediateWrite2 = runtime.AllocateEdgeActor(); - SendRequest(runtime, senderImmediateWrite2, MakeSimpleRequest(Q_(R"( + auto senderImmediateWrite2 = CreateSessionRPC(runtime); + auto fImmWrite2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (9, 9) - )"))); + )"), senderImmediateWrite2, "", true)); + SimulateSleep(runtime, TDuration::MicroSeconds(1)); // Verify it is also hidden at the moment @@ -4668,11 +4645,10 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 5 } }"); + UNIT_ASSERT_VALUES_EQUAL( execSnapshotRequest(Q_(R"( SELECT key, value FROM `/Root/table-1` @@ -4684,9 +4660,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { // Wait for result of the second write { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite2); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto response = AwaitResponse(runtime, fImmWrite2); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } // We should finally observe both writes @@ -4695,13 +4670,11 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " - "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " - "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 5 } }, " + "{ items { uint32_value: 7 } items { uint32_value: 7 } }, " + "{ items { uint32_value: 9 } items { uint32_value: 9 } }"); TString snapshotSessionId, snapshotTxId; UNIT_ASSERT_VALUES_EQUAL( @@ -4871,12 +4844,7 @@ Y_UNIT_TEST(TestUnprotectedReadsThenWriteVisibility) { SimulateSleep(server, TDuration::Seconds(1)); auto execSimpleRequest = [&](const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleExec(runtime, query); }; auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { @@ -4900,9 +4868,7 @@ Y_UNIT_TEST(TestUnprotectedReadsThenWriteVisibility) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); // Same when using a fresh snapshot read TString sessionId, txId; @@ -4932,10 +4898,8 @@ Y_UNIT_TEST(TestUnprotectedReadsThenWriteVisibility) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); // Previous snapshot must see original data UNIT_ASSERT_VALUES_EQUAL( @@ -5227,6 +5191,9 @@ Y_UNIT_TEST(UncommittedReads) { ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)")); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)")); + TString upsertSender = CreateSessionRPC(runtime); + TString readSender = CreateSessionRPC(runtime); + // Block commits and start counting propose responses TVector<THolder<IEventHandle>> blockedCommits; size_t seenProposeResults = 0; @@ -5263,14 +5230,16 @@ Y_UNIT_TEST(UncommittedReads) { }; // Start upserting a row with blocked commits, it will stick to the same version as the last upsert - auto upsertSender = runtime.AllocateEdgeActor(); - SendRequest(runtime, upsertSender, MakeSimpleRequest(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 4)"))); + auto fUpsert = SendRequest( + runtime, + MakeSimpleRequestRPC(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 4)"), upsertSender, "", true)); waitFor([&]{ return blockedCommits.size() > 0; }, "blocked commit"); // Start reading data, we know it must read confirmed data, but it will also include the blocked row above - auto readSender = runtime.AllocateEdgeActor(); - SendRequest(runtime, readSender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key"))); + auto fRead = SendRequest( + runtime, + MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key"), readSender, "", true)); // Sleep for 1 second SimulateSleep(runtime, TDuration::Seconds(1)); @@ -5279,17 +5248,15 @@ Y_UNIT_TEST(UncommittedReads) { if (seenProposeResults > 0) { // We might make it possible in the future to run reads like that without blocking // However, it still means we must not return the 4th row that is not committed - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(readSender); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + auto response = AwaitResponse(runtime, fRead); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + Ydb::Table::ExecuteQueryResult result; + response.operation().result().UnpackTo(&result); UNIT_ASSERT_VALUES_EQUAL( - response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "} Struct { Bool: false }"); + FormatResult(result), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); return; } @@ -5302,25 +5269,22 @@ Y_UNIT_TEST(UncommittedReads) { // We must successfully upsert the row { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(upsertSender); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto response = AwaitResponse(runtime, fUpsert); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } // We must successfully read including the 4th row { - auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(readSender); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + auto response = AwaitResponse(runtime, fRead); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + Ydb::Table::ExecuteQueryResult result; + response.operation().result().UnpackTo(&result); UNIT_ASSERT_VALUES_EQUAL( - response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " - "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " - "List { Struct { Optional { Uint32: 4 } } Struct { Optional { Uint32: 4 } } } " - "} Struct { Bool: false }"); + FormatResult(result), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 4 } }"); } } |