aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-07-15 16:23:37 +0300
committerspuchin <spuchin@ydb.tech>2022-07-15 16:23:37 +0300
commit906329d74fff26eea56ae714cc2f47a185041c8a (patch)
treed80792beb73032937f055954304324bde0bf60fb
parentf4cc982cc70e46fd5948bc2c2c681781306392fd (diff)
downloadydb-906329d74fff26eea56ae714cc2f47a185041c8a.tar.gz
Fix missing InvisibleRowSkips update on empty ranges. ()
-rw-r--r--ydb/core/kqp/ut/kqp_locks_ut.cpp88
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp16
2 files changed, 100 insertions, 4 deletions
diff --git a/ydb/core/kqp/ut/kqp_locks_ut.cpp b/ydb/core/kqp/ut/kqp_locks_ut.cpp
index bd9d9ede5b2..bc6dbd94371 100644
--- a/ydb/core/kqp/ut/kqp_locks_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_locks_ut.cpp
@@ -114,6 +114,94 @@ Y_UNIT_TEST_SUITE(KqpLocks) {
)"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
}
+
+ Y_UNIT_TEST_NEW_ENGINE(EmptyRange) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session1.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM Test WHERE Group = 11;
+ )"), TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+
+ auto tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+
+ result = session2.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM Test WHERE Group = 11;
+
+ UPSERT INTO Test (Group, Name, Amount) VALUES
+ (11, "Session2", 2);
+ )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+
+ result = session1.ExecuteDataQuery(Q1_(R"(
+ UPSERT INTO Test (Group, Name, Amount) VALUES
+ (11, "Session1", 1);
+ )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
+ [] (const NYql::TIssue& issue) {
+ return issue.Message.Contains("/Root/Test");
+ }));
+
+ result = session1.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM Test WHERE Group = 11;
+ )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ Y_UNIT_TEST_NEW_ENGINE(EmptyRangeAlreadyBroken) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session1.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM Test WHERE Group = 10;
+ )"), TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+
+ auto tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+
+ result = session2.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM Test WHERE Group = 11;
+
+ UPSERT INTO Test (Group, Name, Amount) VALUES
+ (11, "Session2", 2);
+ )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+
+ result = session1.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM Test WHERE Group = 11;
+
+ UPSERT INTO Test (Group, Name, Amount) VALUES
+ (11, "Session1", 1);
+ )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
+ [] (const NYql::TIssue& issue) {
+ return issue.Message.Contains("/Root/Test");
+ }));
+
+ result = session1.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM Test WHERE Group = 11;
+ )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
} // namespace NKqp
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index a557d87a2b9..0e10561ab67 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -530,9 +530,13 @@ bool TKqpDatashardComputeContext::ReadRowImpl(const TTableId& tableId, TReadTabl
stats.SelectRangeRows = 1;
stats.SelectRangeBytes = rowSize;
- stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0);
- stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0);
+ break;
+ }
+
+ stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0);
+ stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0);
+ if (iterator.Last() == NTable::EReady::Data) {
return true;
}
@@ -577,9 +581,13 @@ bool TKqpDatashardComputeContext::ReadRowWideImpl(const TTableId& tableId, TRead
stats.SelectRangeRows = 1;
stats.SelectRangeBytes = rowSize;
- stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0);
- stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0);
+ break;
+ }
+
+ stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0);
+ stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0);
+ if (iterator.Last() == NTable::EReady::Data) {
return true;
}