diff options
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 9 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 105 |
2 files changed, 102 insertions, 12 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 540a4d045c2..4e2c9f829b0 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -616,10 +616,11 @@ private: state.State = TShardState::EState::Finished; YQL_ENSURE(state.DatashardState.Defined()); - YQL_ENSURE(!state.DatashardState->Follower); - - Send(MakePipePerNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward( - new TEvDataShard::TEvCancelTransactionProposal(TxId), shardId, /* subscribe */ false)); + //nothing to cancel on follower + if (!state.DatashardState->Follower) { + Send(MakePipePerNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward( + new TEvDataShard::TEvCancelTransactionProposal(TxId), shardId, /* subscribe */ false)); + } } } } diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index b0ac3f9dab4..47b94f19b8a 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -761,8 +761,16 @@ Y_UNIT_TEST_SUITE(KqpLimits) { WaitForZeroSessions(counters); } - Y_UNIT_TEST(CancelAfterRoTx) { - TKikimrRunner kikimr; + void DoCancelAfterRo(bool follower, bool streamLookup, bool dependedRead) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup); + + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + + TKikimrRunner kikimr(serverSettings); NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); { @@ -772,13 +780,73 @@ Y_UNIT_TEST_SUITE(KqpLimits) { int maxTimeoutMs = 500; bool wasCanceled = false; - for (int i = 1; i <= maxTimeoutMs; i++) { - auto result = session.ExecuteDataQuery(R"( + if (follower) { + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/OneShardWithFolower` ( + Key Uint64, + Text String, + Data Int32, + PRIMARY KEY (Key) + ) + WITH ( + READ_REPLICAS_SETTINGS = "ANY_AZ:1" + ); + )").GetValueSync()); + + AssertSuccessResult(session.ExecuteDataQuery(R"( + --!syntax_v1 + REPLACE INTO `/Root/OneShardWithFolower` (Key, Text, Data) VALUES + (101u, "Value1", 1), + (201u, "Value1", 2), + (301u, "Value1", 3), + (401u, "Value1", 1), + (501u, "Value1", 2), + (601u, "Value1", 3), + (701u, "Value1", 1), + (801u, "Value1", 2), + (102u, "Value2", 3), + (202u, "Value2", 1), + (302u, "Value2", 2), + (402u, "Value2", 3), + (502u, "Value2", 1), + (602u, "Value2", 2), + (702u, "Value2", 3), + (802u, "Value2", 1), + (103u, "Value3", 2), + (203u, "Value3", 3), + (303u, "Value3", 1), + (403u, "Value3", 2), + (503u, "Value3", 3), + (603u, "Value3", 1), + (703u, "Value3", 2), + (803u, "Value3", 3); + )", TTxControl::BeginTx().CommitTx()).GetValueSync()); + } + + const TString q = follower ? + (dependedRead ? + TString(R"( + DECLARE $id AS Uint64; + --JOIN with same table to make depended read + SELECT t1.Data as Data, t1.Key as Key, t1.Text as Text FROM `/Root/OneShardWithFolower` as t1 + INNER JOIN OneShardWithFolower as t2 ON t1.Key = t2.Key WHERE t1.Text = "Value1" ORDER BY t1.Key; + )"): + TString(R"( + DECLARE $id AS Uint64; + SELECT * FROM `/Root/OneShardWithFolower` WHERE Text = "Value1" ORDER BY Key + )") + ): + TString(R"( DECLARE $id AS Uint64; - SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; - )", - TTxControl::BeginTx( - TTxSettings::SerializableRW()).CommitTx(), + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key + )"); + + const auto txCtrl = follower ? TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx() : + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + + for (int i = 1; i <= maxTimeoutMs; i++) { + auto result = session.ExecuteDataQuery(q, txCtrl, TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i)) ).GetValueSync(); @@ -803,6 +871,27 @@ Y_UNIT_TEST_SUITE(KqpLimits) { WaitForZeroSessions(counters); } + Y_UNIT_TEST(CancelAfterRoTx) { + // false, false has no sense since we use TEvRead to read without followers + DoCancelAfterRo(false, true, false); + } + + Y_UNIT_TEST(CancelAfterRoTxWithFollowerLegacy) { + DoCancelAfterRo(true, false, false); + } + + Y_UNIT_TEST(CancelAfterRoTxWithFollowerLegacyDependedRead) { + DoCancelAfterRo(true, false, true); + } + + Y_UNIT_TEST(CancelAfterRoTxWithFollowerStreamLookup) { + DoCancelAfterRo(true, true, false); + } + + Y_UNIT_TEST(CancelAfterRoTxWithFollowerStreamLookupDepededRead) { + DoCancelAfterRo(true, true, true); + } + Y_UNIT_TEST(QueryExecTimeout) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10'000'000'000); |
