aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-11-04 23:39:28 +0300
committergvit <gvit@ydb.tech>2022-11-04 23:39:28 +0300
commitb18765b23396d50f028c837c3c0abd38d9e3ff26 (patch)
treeaf6a69637f10413c9b09e786642d37fe237edf75
parent529b247abeb638694ea67f70b79bd062274b83b3 (diff)
downloadydb-b18765b23396d50f028c837c3c0abd38d9e3ff26.tar.gz
continue switching datashard tests to public api
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h19
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp130
2 files changed, 40 insertions, 109 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index 32b7c8e8ad1..eb43679f662 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -50,11 +50,11 @@ namespace NKqpHelpers {
return sessionId;
}
- inline Ydb::Table::ExecuteDataQueryResponse ExecuteDataQueryRPCResponse(
- TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {}) {
- auto future = NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(
+ inline NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> SendRequest(
+ TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {})
+ {
+ return NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(
std::move(request), database, "" /* token */, runtime.GetActorSystem(0));
- return AwaitResponse(runtime, future);
}
inline Ydb::Table::ExecuteDataQueryRequest MakeSimpleRequestRPC(
@@ -214,10 +214,11 @@ namespace NKqpHelpers {
return JoinSeq(", ", result.result_sets(0).rows());
}
+
inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false) {
TString sessionId = CreateSessionRPC(runtime);
TString txId;
- auto response = ExecuteDataQueryRPCResponse(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo));
+ auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo)));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
@@ -232,8 +233,8 @@ namespace NKqpHelpers {
inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) {
sessionId = CreateSessionRPC(runtime);
- Y_VERIFY(txId.empty(), "txId reused between transactions"); // ensure
- auto response = ExecuteDataQueryRPCResponse(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */));
+ txId.clear();
+ auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */)));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
@@ -245,7 +246,7 @@ namespace NKqpHelpers {
inline TString KqpSimpleContinue(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
Y_VERIFY(!txId.empty(), "continue on empty transaction");
- auto response = ExecuteDataQueryRPCResponse(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */));
+ auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */)));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
@@ -257,7 +258,7 @@ namespace NKqpHelpers {
inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
Y_VERIFY(!txId.empty(), "commit on empty transaction");
- auto response = ExecuteDataQueryRPCResponse(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */));
+ auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */)));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 642483c4f97..2559d8e4c06 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1008,25 +1008,11 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SimulateSleep(server, TDuration::Seconds(1));
auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- sessionId = CreateSessionRPC(runtime);
- auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- txId = response.GetResponse().GetTxMeta().id();
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleBegin(runtime, sessionId, txId, query);
};
auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
- auto& response = ev->Get()->Record.GetRef();
- if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
- }
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleContinue(runtime, sessionId, txId, query);
};
auto execSnapshotRequest = [&](const TString& query) -> TString {
@@ -1042,9 +1028,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
SimulateSleep(runtime, TDuration::Seconds(2));
// Create a new snapshot, it should still observe the same state
@@ -1054,9 +1038,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
// Insert a new row and wait for result, this will roll over into a new step
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)"));
@@ -1076,9 +1058,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
UNIT_ASSERT_VALUES_EQUAL(
result,
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
}
UNIT_ASSERT_C(failed, "Snapshot was not cleaned up");
@@ -1113,34 +1093,15 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SimulateSleep(server, TDuration::Seconds(1));
auto execSimpleRequest = [&](const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleExec(runtime, query);
};
auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- sessionId = CreateSessionRPC(runtime);
- auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- txId = response.GetResponse().GetTxMeta().id();
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleBegin(runtime, sessionId, txId, query);
};
auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
- auto& response = ev->Get()->Record.GetRef();
- if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
- }
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleContinue(runtime, sessionId, txId, query);
};
auto execSnapshotRequest = [&](const TString& query) -> TString {
@@ -1168,9 +1129,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
SimulateSleep(runtime, TDuration::Seconds(2));
bool captureSplit = true;
@@ -1223,9 +1182,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
// Finish the split
captureSplit = false;
@@ -1261,9 +1218,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SELECT key, value FROM `/Root/table-1`
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
// But new immediate read must observe all writes we have performed
UNIT_ASSERT_VALUES_EQUAL(
@@ -1272,10 +1227,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
WHERE key in (1, 2, 3)
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
}
Y_UNIT_TEST(MvccSnapshotReadWithLongPlanQueue) {
@@ -1310,25 +1263,11 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SimulateSleep(server, TDuration::Seconds(1));
auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- sessionId = CreateSessionRPC(runtime);
- auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- txId = response.GetResponse().GetTxMeta().id();
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleBegin(runtime, sessionId, txId, query);
};
auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
- auto& response = ev->Get()->Record.GetRef();
- if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
- }
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ return KqpSimpleContinue(runtime, sessionId, txId, query);
};
// Prime table1 with a snapshot read
@@ -1340,24 +1279,21 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
WHERE key in (1, 2, 3)
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
}
// Arrange for a distributed tx stuck at readset exchange
- auto senderBlocker = runtime.AllocateEdgeActor();
TString sessionIdBlocker = CreateSessionRPC(runtime);
TString txIdBlocker;
{
- auto ev = ExecRequest(runtime, sender, MakeBeginRequest(sessionIdBlocker, Q_(R"(
+ auto result = KqpSimpleBegin(runtime, sessionIdBlocker, txIdBlocker, Q_(R"(
SELECT * FROM `/Root/table-1`
UNION ALL
- SELECT * FROM `/Root/table-2`)")));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- txIdBlocker = response.GetResponse().GetTxMeta().id();
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ SELECT * FROM `/Root/table-2`)"));
+ UNIT_ASSERT_VALUES_EQUAL(
+ result,
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
}
auto waitFor = [&](const auto& condition, const TString& description) {
@@ -1397,9 +1333,9 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
auto prevObserverFunc = runtime.SetObserverFunc(captureRS);
// Send a commit request, it would block on readset exchange
- SendRequest(runtime, senderBlocker, MakeCommitRequest(sessionIdBlocker, txIdBlocker, Q_(R"(
+ SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (99, 99);
- UPSERT INTO `/Root/table-2` (key, value) VALUES (99, 99))")));
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (99, 99); )"), sessionIdBlocker, txIdBlocker, true));
waitFor([&] { return readSets.size() >= 2; }, "2 blocked readsets");
@@ -1427,10 +1363,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
WHERE key in (1, 2, 3)
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
// Now schedule creation of 10 more snapshots
for (int i = 0; i < 10; ++i) {
@@ -1447,10 +1381,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
WHERE key in (1, 2, 3)
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
// Insert one more row, in a buggy case it would be assigned a version below the snapshot
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"));
@@ -1462,10 +1394,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
WHERE key in (1, 2, 3)
ORDER BY key
)")),
- "Struct { "
- "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
- "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
- "} Struct { Bool: false }");
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
}
struct TLockSnapshot {