diff options
author | gvit <gvit@ydb.tech> | 2022-11-04 23:39:28 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-11-04 23:39:28 +0300 |
commit | b18765b23396d50f028c837c3c0abd38d9e3ff26 (patch) | |
tree | af6a69637f10413c9b09e786642d37fe237edf75 | |
parent | 529b247abeb638694ea67f70b79bd062274b83b3 (diff) | |
download | ydb-b18765b23396d50f028c837c3c0abd38d9e3ff26.tar.gz |
continue switching datashard tests to public api
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common_kqp.h | 19 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 130 |
2 files changed, 40 insertions, 109 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 32b7c8e8ad1..eb43679f662 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -50,11 +50,11 @@ namespace NKqpHelpers { return sessionId; } - inline Ydb::Table::ExecuteDataQueryResponse ExecuteDataQueryRPCResponse( - TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {}) { - auto future = NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>( + inline NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> SendRequest( + TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {}) + { + return NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>( std::move(request), database, "" /* token */, runtime.GetActorSystem(0)); - return AwaitResponse(runtime, future); } inline Ydb::Table::ExecuteDataQueryRequest MakeSimpleRequestRPC( @@ -214,10 +214,11 @@ namespace NKqpHelpers { return JoinSeq(", ", result.result_sets(0).rows()); } + inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false) { TString sessionId = CreateSessionRPC(runtime); TString txId; - auto response = ExecuteDataQueryRPCResponse(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo)); + auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo))); if (response.operation().status() != Ydb::StatusIds::SUCCESS) { return TStringBuilder() << "ERROR: " << response.operation().status(); } @@ -232,8 +233,8 @@ namespace NKqpHelpers { inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) { sessionId = CreateSessionRPC(runtime); - Y_VERIFY(txId.empty(), "txId reused between transactions"); // ensure - auto response = ExecuteDataQueryRPCResponse(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */)); + txId.clear(); + auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */))); if (response.operation().status() != Ydb::StatusIds::SUCCESS) { return TStringBuilder() << "ERROR: " << response.operation().status(); } @@ -245,7 +246,7 @@ namespace NKqpHelpers { inline TString KqpSimpleContinue(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) { Y_VERIFY(!txId.empty(), "continue on empty transaction"); - auto response = ExecuteDataQueryRPCResponse(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */)); + auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */))); if (response.operation().status() != Ydb::StatusIds::SUCCESS) { return TStringBuilder() << "ERROR: " << response.operation().status(); } @@ -257,7 +258,7 @@ namespace NKqpHelpers { inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) { Y_VERIFY(!txId.empty(), "commit on empty transaction"); - auto response = ExecuteDataQueryRPCResponse(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */)); + auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */))); if (response.operation().status() != Ydb::StatusIds::SUCCESS) { return TStringBuilder() << "ERROR: " << response.operation().status(); } diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 642483c4f97..2559d8e4c06 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1008,25 +1008,11 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SimulateSleep(server, TDuration::Seconds(1)); auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - sessionId = CreateSessionRPC(runtime); - auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - txId = response.GetResponse().GetTxMeta().id(); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleBegin(runtime, sessionId, txId, query); }; auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); - auto& response = ev->Get()->Record.GetRef(); - if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); - } - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleContinue(runtime, sessionId, txId, query); }; auto execSnapshotRequest = [&](const TString& query) -> TString { @@ -1042,9 +1028,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); SimulateSleep(runtime, TDuration::Seconds(2)); // Create a new snapshot, it should still observe the same state @@ -1054,9 +1038,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); // Insert a new row and wait for result, this will roll over into a new step ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)")); @@ -1076,9 +1058,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } UNIT_ASSERT_VALUES_EQUAL( result, - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); } UNIT_ASSERT_C(failed, "Snapshot was not cleaned up"); @@ -1113,34 +1093,15 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SimulateSleep(server, TDuration::Seconds(1)); auto execSimpleRequest = [&](const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleExec(runtime, query); }; auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - sessionId = CreateSessionRPC(runtime); - auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - txId = response.GetResponse().GetTxMeta().id(); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleBegin(runtime, sessionId, txId, query); }; auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); - auto& response = ev->Get()->Record.GetRef(); - if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); - } - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleContinue(runtime, sessionId, txId, query); }; auto execSnapshotRequest = [&](const TString& query) -> TString { @@ -1168,9 +1129,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); SimulateSleep(runtime, TDuration::Seconds(2)); bool captureSplit = true; @@ -1223,9 +1182,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); // Finish the split captureSplit = false; @@ -1261,9 +1218,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SELECT key, value FROM `/Root/table-1` ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); // But new immediate read must observe all writes we have performed UNIT_ASSERT_VALUES_EQUAL( @@ -1272,10 +1227,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { WHERE key in (1, 2, 3) ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); } Y_UNIT_TEST(MvccSnapshotReadWithLongPlanQueue) { @@ -1310,25 +1263,11 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SimulateSleep(server, TDuration::Seconds(1)); auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - sessionId = CreateSessionRPC(runtime); - auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - txId = response.GetResponse().GetTxMeta().id(); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleBegin(runtime, sessionId, txId, query); }; auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); - auto& response = ev->Get()->Record.GetRef(); - if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); - } - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + return KqpSimpleContinue(runtime, sessionId, txId, query); }; // Prime table1 with a snapshot read @@ -1340,24 +1279,21 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { WHERE key in (1, 2, 3) ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); } // Arrange for a distributed tx stuck at readset exchange - auto senderBlocker = runtime.AllocateEdgeActor(); TString sessionIdBlocker = CreateSessionRPC(runtime); TString txIdBlocker; { - auto ev = ExecRequest(runtime, sender, MakeBeginRequest(sessionIdBlocker, Q_(R"( + auto result = KqpSimpleBegin(runtime, sessionIdBlocker, txIdBlocker, Q_(R"( SELECT * FROM `/Root/table-1` UNION ALL - SELECT * FROM `/Root/table-2`)"))); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - txIdBlocker = response.GetResponse().GetTxMeta().id(); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + SELECT * FROM `/Root/table-2`)")); + UNIT_ASSERT_VALUES_EQUAL( + result, + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); } auto waitFor = [&](const auto& condition, const TString& description) { @@ -1397,9 +1333,9 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { auto prevObserverFunc = runtime.SetObserverFunc(captureRS); // Send a commit request, it would block on readset exchange - SendRequest(runtime, senderBlocker, MakeCommitRequest(sessionIdBlocker, txIdBlocker, Q_(R"( + SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (99, 99); - UPSERT INTO `/Root/table-2` (key, value) VALUES (99, 99))"))); + UPSERT INTO `/Root/table-2` (key, value) VALUES (99, 99); )"), sessionIdBlocker, txIdBlocker, true)); waitFor([&] { return readSets.size() >= 2; }, "2 blocked readsets"); @@ -1427,10 +1363,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { WHERE key in (1, 2, 3) ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); // Now schedule creation of 10 more snapshots for (int i = 0; i < 10; ++i) { @@ -1447,10 +1381,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { WHERE key in (1, 2, 3) ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); // Insert one more row, in a buggy case it would be assigned a version below the snapshot ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)")); @@ -1462,10 +1394,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { WHERE key in (1, 2, 3) ORDER BY key )")), - "Struct { " - "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " - "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " - "} Struct { Bool: false }"); + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); } struct TLockSnapshot { |