aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-11-13 23:24:51 +0300
committergvit <gvit@ydb.tech>2022-11-13 23:24:51 +0300
commit1d1f495e76f87ee31598b60ea254edce5e6c78f5 (patch)
tree76a291ea0ae7ea76602e99df6e3cbb3bf2b2ce29
parentb10c413747afc8d9e64c182ac9dbbeebcf767eab (diff)
downloadydb-1d1f495e76f87ee31598b60ea254edce5e6c78f5.tar.gz
switch test to public api
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h16
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp344
2 files changed, 154 insertions, 206 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index 68dd97a9fb2..15b89213cac 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -79,22 +79,6 @@ namespace NKqpHelpers {
return request;
}
- inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSimpleRequest(
- const TString& sql,
- const TString& database = {})
- {
- auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
- request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
- request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
- request->Record.MutableRequest()->SetQuery(sql);
- if (!database.empty()) {
- request->Record.MutableRequest()->SetDatabase(database);
- }
- return request;
- }
-
inline void SendRequest(
TTestActorRuntime& runtime,
TActorId sender,
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 91525fd1b19..3cc39c176e5 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -3094,6 +3094,8 @@ Y_UNIT_TEST(TestReadTableWriteConflict) {
// NOTE: table-1 has 2 shards so ReadTable is not immediate
CreateShardedTable(server, sender, "/Root", "table-1", 2);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ TString senderWriteImm = CreateSessionRPC(runtime);
+ TString senderWriteDist = CreateSessionRPC(runtime);
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
@@ -3171,15 +3173,13 @@ Y_UNIT_TEST(TestReadTableWriteConflict) {
// Start an immediate write to table-1, it won't be able to start
// because it arrived after the read table and they block each other
- auto senderWriteImm = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderWriteImm, MakeSimpleRequest(Q_(
- "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3)")));
+ auto fWriteImm = SendRequest(runtime, MakeSimpleRequestRPC(Q_(
+ "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3)"), senderWriteImm, "", true));
// Start a planned write to both tables, wait for its plan step too
- auto senderWriteDist = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderWriteDist, MakeSimpleRequest(Q_(
+ auto fWriteDist = SendRequest(runtime, MakeSimpleRequestRPC(Q_(
"UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 4); "
- "UPSERT INTO `/Root/table-2` (key, value) VALUES (8, 4)")));
+ "UPSERT INTO `/Root/table-2` (key, value) VALUES (8, 4)"), senderWriteDist, "", true));
if (seenPlanSteps < 2) {
TDispatchOptions options;
options.FinalEvents.emplace_back(
@@ -3208,16 +3208,14 @@ Y_UNIT_TEST(TestReadTableWriteConflict) {
// Wait for immediate write to succeed
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWriteImm);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto response = AwaitResponse(runtime, fWriteImm);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
// Wait for distributed write to succeed
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWriteDist);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto response = AwaitResponse(runtime, fWriteDist);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
}
@@ -3243,6 +3241,8 @@ Y_UNIT_TEST(TestReadTableImmediateWriteBlock) {
// NOTE: table-1 has 2 shards so ReadTable is not immediate
CreateShardedTable(server, sender, "/Root", "table-1", 2);
+ TString senderWriteImm = CreateSessionRPC(runtime);
+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
// Capture and block all readset messages
@@ -3282,15 +3282,13 @@ Y_UNIT_TEST(TestReadTableImmediateWriteBlock) {
SimulateSleep(server, TDuration::Seconds(1));
// Start an immediate write to table-1, it should be able to complete
- auto senderWriteImm = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderWriteImm, MakeSimpleRequest(Q_(
- "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3)")));
+ auto fWriteImm = SendRequest(runtime, MakeSimpleRequestRPC(Q_(
+ "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3)"), senderWriteImm, "", true));
// Wait for immediate write to succeed
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderWriteImm);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto response = AwaitResponse(runtime, fWriteImm);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
}
@@ -3574,45 +3572,53 @@ void TestLateKqpQueryAfterColumnDrop(bool dataQuery, const TString& query, bool
};
auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+ std::function<void()> processEvents = [&]() {
+ // Wait until there's exactly one propose message at our datashard
+ if (eventsPropose.size() < 1) {
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return eventsPropose.size() >= 1;
+ };
+ runtime.DispatchEvents(options);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(eventsPropose.size(), 1u);
+ Cerr << "--- captured scan tx proposal" << Endl;
+ capturePropose = false;
+
+ // Drop column value2 and wait for drop to finish
+ auto dropTxId = AsyncAlterDropColumn(server, "/Root", "table-1", "value2");
+ WaitTxNotification(server, dropTxId);
+
+ // Resend delayed propose messages
+ Cerr << "--- resending captured proposals" << Endl;
+ for (auto& ev : eventsPropose) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ eventsPropose.clear();
+ return;
+ };
+
if (dataQuery) {
Cerr << "--- sending data query request" << Endl;
- SendRequest(runtime, streamSender, MakeSimpleRequest(query));
+ auto tmp = CreateSessionRPC(runtime);
+ auto f = SendRequest(runtime, MakeSimpleRequestRPC(query, tmp, "", true));
+ processEvents();
+ auto response = AwaitResponse(runtime, f);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::ABORTED);
} else {
Cerr << "--- sending stream request" << Endl;
SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, query, false));
- }
-
- // Wait until there's exactly one propose message at our datashard
- if (eventsPropose.size() < 1) {
- TDispatchOptions options;
- options.CustomFinalCondition = [&]() {
- return eventsPropose.size() >= 1;
- };
- runtime.DispatchEvents(options);
- }
- UNIT_ASSERT_VALUES_EQUAL(eventsPropose.size(), 1u);
- Cerr << "--- captured scan tx proposal" << Endl;
- capturePropose = false;
-
- // Drop column value2 and wait for drop to finish
- auto dropTxId = AsyncAlterDropColumn(server, "/Root", "table-1", "value2");
- WaitTxNotification(server, dropTxId);
+ processEvents();
- // Resend delayed propose messages
- Cerr << "--- resending captured proposals" << Endl;
- for (auto& ev : eventsPropose) {
- runtime.Send(ev.Release(), 0, true);
+ Cerr << "--- waiting for result" << Endl;
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender);
+ auto& response = ev->Get()->Record.GetRef();
+ Cerr << response.DebugString() << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::ABORTED);
+ auto& issue = response.GetResponse().GetQueryIssues(0);
+ UNIT_ASSERT_VALUES_EQUAL(issue.issue_code(), (int) NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH);
+ UNIT_ASSERT_STRINGS_EQUAL(issue.message(), "Table \'/Root/table-1\' scheme changed.");
}
- eventsPropose.clear();
-
- Cerr << "--- waiting for result" << Endl;
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender);
- auto& response = ev->Get()->Record.GetRef();
- Cerr << response.DebugString() << Endl;
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::ABORTED);
- auto& issue = response.GetResponse().GetQueryIssues(0);
- UNIT_ASSERT_VALUES_EQUAL(issue.issue_code(), (int) NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH);
- UNIT_ASSERT_STRINGS_EQUAL(issue.message(), "Table \'/Root/table-1\' scheme changed.");
}
Y_UNIT_TEST_WITH_MVCC(TestLateKqpScanAfterColumnDrop) {
@@ -3643,6 +3649,7 @@ Y_UNIT_TEST(MvccTestSnapshotRead) {
CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ TString senderSession = CreateSessionRPC(runtime);
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0), (1, 1), (2, 2), (3, 3);"));
auto waitFor = [&](const auto& condition, const TString& description) {
@@ -3705,7 +3712,9 @@ Y_UNIT_TEST(MvccTestSnapshotRead) {
// future snapshot
snapshot = TRowVersion(lastStep + 1000, Max<ui64>());
- SendRequest(runtime, sender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key")));
+ auto f = SendRequest(
+ runtime,
+ MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key"), senderSession, "", true));
waitFor([&]{ return rewritten; }, "EvProposeTransaction rewritten");
@@ -3714,12 +3723,11 @@ Y_UNIT_TEST(MvccTestSnapshotRead) {
waitFor([&]{ return rescheduled; }, "EvProposeTransaction rescheduled");
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- TString expected = "Struct { List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } } Struct { Bool: false }";
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected);
+ auto response = AwaitResponse(runtime, f);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+ Ydb::Table::ExecuteQueryResult result;
+ response.operation().result().UnpackTo(&result);
+ UNIT_ASSERT_VALUES_EQUAL(FormatResult(result), "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
}
auto tmp = std::exchange(snapshot, TRowVersion::Min());
@@ -3729,24 +3737,16 @@ Y_UNIT_TEST(MvccTestSnapshotRead) {
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 10);"));
{
- auto ev = ExecRequest(runtime, sender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key")));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- TString expected = "Struct { List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 10 } } } } Struct { Bool: false }";
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected);
+ auto result = KqpSimpleExec(runtime, Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key"));
+ UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 2 } items { uint32_value: 10 } }");
}
snapshot = tmp;
rescheduled = false;
{
- auto ev = ExecRequest(runtime, sender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key")));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- TString expected = "Struct { List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } } Struct { Bool: false }";
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected);
+ auto result = KqpSimpleExec(runtime, Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 2 ORDER BY key"));
+ UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
}
UNIT_ASSERT(!rescheduled);
@@ -3858,6 +3858,7 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, UseMvcc) {
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
TString sessionId = CreateSessionRPC(runtime);
+ TString sender3Session = CreateSessionRPC(runtime);
TString txId;
{
@@ -3912,10 +3913,9 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, UseMvcc) {
// Now send an upsert to table-1, it should be blocked by our in-progress tx
delayedProposeCount = 0;
- auto sender3 = runtime.AllocateEdgeActor();
Cerr << "... sending immediate upsert" << Endl;
- SendRequest(runtime, sender3, MakeSimpleRequest(Q_(R"(
- UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 42), (3, 51))")));
+ auto f3 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 42), (3, 51))"), sender3Session, "", true));
// Wait unti that propose starts to execute
waitFor([&]{ return delayedProposeCount >= 1; }, "immediate propose");
@@ -3928,22 +3928,15 @@ Y_UNIT_TEST_TWIN(TestShardRestartNoUndeterminedImmediate, UseMvcc) {
// The result of immediate upsert must be neither SUCCESS nor UNDETERMINED
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender3);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_UNEQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_UNEQUAL(response.GetYdbStatus(), Ydb::StatusIds::UNDETERMINED);
+ auto response = AwaitResponse(runtime, f3);
+ UNIT_ASSERT_VALUES_UNEQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_UNEQUAL(response.operation().status(), Ydb::StatusIds::UNDETERMINED);
}
// Select key 1 and verify its value was not updated
{
- auto sender4 = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, sender4, MakeSimpleRequest(Q_(R"(
- SELECT key, value FROM `/Root/table-1` WHERE key = 1 ORDER BY key)")));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- TString expected = "Struct { List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } } Struct { Bool: false }";
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected);
+ auto result = KqpSimpleExec(runtime, Q_(R"(SELECT key, value FROM `/Root/table-1` WHERE key = 1 ORDER BY key)"));
+ UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
}
}
@@ -4029,14 +4022,8 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, UseMvcc) {
// Select key 3 and verify its value was updated
{
- auto sender4 = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, sender4, MakeSimpleRequest(Q_(R"(
- SELECT key, value FROM `/Root/table-1` WHERE key = 3 ORDER BY key)")));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- TString expected = "Struct { List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 2 } } } } Struct { Bool: false }";
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), expected);
+ auto result = KqpSimpleExec(runtime, Q_(R"(SELECT key, value FROM `/Root/table-1` WHERE key = 3 ORDER BY key)"));
+ UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 3 } items { uint32_value: 2 } }");
}
}
@@ -4116,6 +4103,9 @@ Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) {
TString sessionId1 = CreateSessionRPC(runtime, "/Root");
TString sessionId2 = CreateSessionRPC(runtime, "/Root");
+ TString sq1 = CreateSessionRPC(runtime, "/Root");
+ TString sq2 = CreateSessionRPC(runtime, "/Root");
+
auto f1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
SELECT * FROM `/Root/table-1`
UNION ALL
@@ -4166,8 +4156,8 @@ Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) {
// Start blocking commits again and try performing new writes
prevObserver = runtime.SetObserverFunc(blockCommits);
- SendRequest(runtime, sender, MakeSimpleRequest(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"), "/Root"));
- SendRequest(runtime, sender, MakeSimpleRequest(Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 4)"), "/Root"));
+ SendRequest(runtime, MakeSimpleRequestRPC(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"), sq1, "", true), "/Root");
+ SendRequest(runtime, MakeSimpleRequestRPC(Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 4)"), sq2, "", true), "/Root");
waitFor([&]{ return blockedCommits.size() >= 2; }, "at least 2 blocked commits");
// Send an additional read request, it must not be blocked
@@ -4491,6 +4481,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ TString senderImmediateWrite = CreateSessionRPC(runtime);
+
auto table1shards = GetTableShards(server, sender, "/Root/table-1");
auto table2shards = GetTableShards(server, sender, "/Root/table-2");
@@ -4503,12 +4495,7 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"));
auto execSimpleRequest = [&](const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleExec(runtime, query);
};
auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
@@ -4532,10 +4519,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }");
// Same when using a fresh snapshot read
UNIT_ASSERT_VALUES_EQUAL(
@@ -4556,10 +4541,9 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
}
// Send an immediate write transaction, but don't wait for result
- auto senderImmediateWrite = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"(
+ auto fImmWrite = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5)
- )")));
+ )"), senderImmediateWrite, "", true));
// We sleep for very little so datashard commits changes, but doesn't advance
SimulateSleep(runtime, TDuration::MicroSeconds(1));
@@ -4570,10 +4554,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }");
// Same when using a fresh snapshot read
UNIT_ASSERT_VALUES_EQUAL(
@@ -4586,9 +4568,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
// Wait for the write to finish
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto response = AwaitResponse(runtime, fImmWrite);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
// Perform an immediate read again, it should observe the write above
@@ -4597,11 +4578,9 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
+ "{ items { uint32_value: 5 } items { uint32_value: 5 } }");
// Same when using a fresh snapshot read
UNIT_ASSERT_VALUES_EQUAL(
@@ -4614,9 +4593,10 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
"{ items { uint32_value: 5 } items { uint32_value: 5 } }");
// Start a new write and sleep again
- SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"(
+ fImmWrite = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 7)
- )")));
+ )"), senderImmediateWrite, "", true));
+
SimulateSleep(runtime, TDuration::MicroSeconds(1));
// Verify this write is not observed yet
@@ -4625,11 +4605,9 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
+ "{ items { uint32_value: 5 } items { uint32_value: 5 } }");
// Spam schedules in the runtime to prevent mediator time jumping prematurely
{
@@ -4649,17 +4627,16 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
+ "{ items { uint32_value: 5 } items { uint32_value: 5 } }");
// Send one more write and sleep again
- auto senderImmediateWrite2 = runtime.AllocateEdgeActor();
- SendRequest(runtime, senderImmediateWrite2, MakeSimpleRequest(Q_(R"(
+ auto senderImmediateWrite2 = CreateSessionRPC(runtime);
+ auto fImmWrite2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (9, 9)
- )")));
+ )"), senderImmediateWrite2, "", true));
+
SimulateSleep(runtime, TDuration::MicroSeconds(1));
// Verify it is also hidden at the moment
@@ -4668,11 +4645,10 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
+ "{ items { uint32_value: 5 } items { uint32_value: 5 } }");
+
UNIT_ASSERT_VALUES_EQUAL(
execSnapshotRequest(Q_(R"(
SELECT key, value FROM `/Root/table-1`
@@ -4684,9 +4660,8 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
// Wait for result of the second write
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite2);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto response = AwaitResponse(runtime, fImmWrite2);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
// We should finally observe both writes
@@ -4695,13 +4670,11 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
- "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
- "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
+ "{ items { uint32_value: 5 } items { uint32_value: 5 } }, "
+ "{ items { uint32_value: 7 } items { uint32_value: 7 } }, "
+ "{ items { uint32_value: 9 } items { uint32_value: 9 } }");
TString snapshotSessionId, snapshotTxId;
UNIT_ASSERT_VALUES_EQUAL(
@@ -4871,12 +4844,7 @@ Y_UNIT_TEST(TestUnprotectedReadsThenWriteVisibility) {
SimulateSleep(server, TDuration::Seconds(1));
auto execSimpleRequest = [&](const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleExec(runtime, query);
};
auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
@@ -4900,9 +4868,7 @@ Y_UNIT_TEST(TestUnprotectedReadsThenWriteVisibility) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
// Same when using a fresh snapshot read
TString sessionId, txId;
@@ -4932,10 +4898,8 @@ Y_UNIT_TEST(TestUnprotectedReadsThenWriteVisibility) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
// Previous snapshot must see original data
UNIT_ASSERT_VALUES_EQUAL(
@@ -5227,6 +5191,9 @@ Y_UNIT_TEST(UncommittedReads) {
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"));
+ TString upsertSender = CreateSessionRPC(runtime);
+ TString readSender = CreateSessionRPC(runtime);
+
// Block commits and start counting propose responses
TVector<THolder<IEventHandle>> blockedCommits;
size_t seenProposeResults = 0;
@@ -5263,14 +5230,16 @@ Y_UNIT_TEST(UncommittedReads) {
};
// Start upserting a row with blocked commits, it will stick to the same version as the last upsert
- auto upsertSender = runtime.AllocateEdgeActor();
- SendRequest(runtime, upsertSender, MakeSimpleRequest(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 4)")));
+ auto fUpsert = SendRequest(
+ runtime,
+ MakeSimpleRequestRPC(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 4)"), upsertSender, "", true));
waitFor([&]{ return blockedCommits.size() > 0; }, "blocked commit");
// Start reading data, we know it must read confirmed data, but it will also include the blocked row above
- auto readSender = runtime.AllocateEdgeActor();
- SendRequest(runtime, readSender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key")));
+ auto fRead = SendRequest(
+ runtime,
+ MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key"), readSender, "", true));
// Sleep for 1 second
SimulateSleep(runtime, TDuration::Seconds(1));
@@ -5279,17 +5248,15 @@ Y_UNIT_TEST(UncommittedReads) {
if (seenProposeResults > 0) {
// We might make it possible in the future to run reads like that without blocking
// However, it still means we must not return the 4th row that is not committed
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(readSender);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ auto response = AwaitResponse(runtime, fRead);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+ Ydb::Table::ExecuteQueryResult result;
+ response.operation().result().UnpackTo(&result);
UNIT_ASSERT_VALUES_EQUAL(
- response.GetResponse().GetResults()[0].GetValue().ShortDebugString(),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "} Struct { Bool: false }");
+ FormatResult(result),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }");
return;
}
@@ -5302,25 +5269,22 @@ Y_UNIT_TEST(UncommittedReads) {
// We must successfully upsert the row
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(upsertSender);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto response = AwaitResponse(runtime, fUpsert);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
// We must successfully read including the 4th row
{
- auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(readSender);
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ auto response = AwaitResponse(runtime, fRead);
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+ Ydb::Table::ExecuteQueryResult result;
+ response.operation().result().UnpackTo(&result);
UNIT_ASSERT_VALUES_EQUAL(
- response.GetResponse().GetResults()[0].GetValue().ShortDebugString(),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
- "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
- "List { Struct { Optional { Uint32: 4 } } Struct { Optional { Uint32: 4 } } } "
- "} Struct { Bool: false }");
+ FormatResult(result),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
+ "{ items { uint32_value: 4 } items { uint32_value: 4 } }");
}
}