summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp9
-rw-r--r--ydb/core/kqp/ut/query/kqp_limits_ut.cpp105
2 files changed, 102 insertions, 12 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 540a4d045c2..4e2c9f829b0 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -616,10 +616,11 @@ private:
state.State = TShardState::EState::Finished;
YQL_ENSURE(state.DatashardState.Defined());
- YQL_ENSURE(!state.DatashardState->Follower);
-
- Send(MakePipePerNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward(
- new TEvDataShard::TEvCancelTransactionProposal(TxId), shardId, /* subscribe */ false));
+ //nothing to cancel on follower
+ if (!state.DatashardState->Follower) {
+ Send(MakePipePerNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward(
+ new TEvDataShard::TEvCancelTransactionProposal(TxId), shardId, /* subscribe */ false));
+ }
}
}
}
diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
index b0ac3f9dab4..47b94f19b8a 100644
--- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
@@ -761,8 +761,16 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
WaitForZeroSessions(counters);
}
- Y_UNIT_TEST(CancelAfterRoTx) {
- TKikimrRunner kikimr;
+ void DoCancelAfterRo(bool follower, bool streamLookup, bool dependedRead) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
+
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+
+ TKikimrRunner kikimr(serverSettings);
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
{
@@ -772,13 +780,73 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
int maxTimeoutMs = 500;
bool wasCanceled = false;
- for (int i = 1; i <= maxTimeoutMs; i++) {
- auto result = session.ExecuteDataQuery(R"(
+ if (follower) {
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/OneShardWithFolower` (
+ Key Uint64,
+ Text String,
+ Data Int32,
+ PRIMARY KEY (Key)
+ )
+ WITH (
+ READ_REPLICAS_SETTINGS = "ANY_AZ:1"
+ );
+ )").GetValueSync());
+
+ AssertSuccessResult(session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ REPLACE INTO `/Root/OneShardWithFolower` (Key, Text, Data) VALUES
+ (101u, "Value1", 1),
+ (201u, "Value1", 2),
+ (301u, "Value1", 3),
+ (401u, "Value1", 1),
+ (501u, "Value1", 2),
+ (601u, "Value1", 3),
+ (701u, "Value1", 1),
+ (801u, "Value1", 2),
+ (102u, "Value2", 3),
+ (202u, "Value2", 1),
+ (302u, "Value2", 2),
+ (402u, "Value2", 3),
+ (502u, "Value2", 1),
+ (602u, "Value2", 2),
+ (702u, "Value2", 3),
+ (802u, "Value2", 1),
+ (103u, "Value3", 2),
+ (203u, "Value3", 3),
+ (303u, "Value3", 1),
+ (403u, "Value3", 2),
+ (503u, "Value3", 3),
+ (603u, "Value3", 1),
+ (703u, "Value3", 2),
+ (803u, "Value3", 3);
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync());
+ }
+
+ const TString q = follower ?
+ (dependedRead ?
+ TString(R"(
+ DECLARE $id AS Uint64;
+ --JOIN with same table to make depended read
+ SELECT t1.Data as Data, t1.Key as Key, t1.Text as Text FROM `/Root/OneShardWithFolower` as t1
+ INNER JOIN OneShardWithFolower as t2 ON t1.Key = t2.Key WHERE t1.Text = "Value1" ORDER BY t1.Key;
+ )"):
+ TString(R"(
+ DECLARE $id AS Uint64;
+ SELECT * FROM `/Root/OneShardWithFolower` WHERE Text = "Value1" ORDER BY Key
+ )")
+ ):
+ TString(R"(
DECLARE $id AS Uint64;
- SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
- )",
- TTxControl::BeginTx(
- TTxSettings::SerializableRW()).CommitTx(),
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key
+ )");
+
+ const auto txCtrl = follower ? TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx() :
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
+
+ for (int i = 1; i <= maxTimeoutMs; i++) {
+ auto result = session.ExecuteDataQuery(q, txCtrl,
TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i))
).GetValueSync();
@@ -803,6 +871,27 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
WaitForZeroSessions(counters);
}
+ Y_UNIT_TEST(CancelAfterRoTx) {
+ // false, false has no sense since we use TEvRead to read without followers
+ DoCancelAfterRo(false, true, false);
+ }
+
+ Y_UNIT_TEST(CancelAfterRoTxWithFollowerLegacy) {
+ DoCancelAfterRo(true, false, false);
+ }
+
+ Y_UNIT_TEST(CancelAfterRoTxWithFollowerLegacyDependedRead) {
+ DoCancelAfterRo(true, false, true);
+ }
+
+ Y_UNIT_TEST(CancelAfterRoTxWithFollowerStreamLookup) {
+ DoCancelAfterRo(true, true, false);
+ }
+
+ Y_UNIT_TEST(CancelAfterRoTxWithFollowerStreamLookupDepededRead) {
+ DoCancelAfterRo(true, true, true);
+ }
+
Y_UNIT_TEST(QueryExecTimeout) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10'000'000'000);