diff options
author | spuchin <spuchin@ydb.tech> | 2022-07-15 16:23:37 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-07-15 16:23:37 +0300 |
commit | 906329d74fff26eea56ae714cc2f47a185041c8a (patch) | |
tree | d80792beb73032937f055954304324bde0bf60fb | |
parent | f4cc982cc70e46fd5948bc2c2c681781306392fd (diff) | |
download | ydb-906329d74fff26eea56ae714cc2f47a185041c8a.tar.gz |
Fix missing InvisibleRowSkips update on empty ranges. ()
-rw-r--r-- | ydb/core/kqp/ut/kqp_locks_ut.cpp | 88 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 16 |
2 files changed, 100 insertions, 4 deletions
diff --git a/ydb/core/kqp/ut/kqp_locks_ut.cpp b/ydb/core/kqp/ut/kqp_locks_ut.cpp index bd9d9ede5b2..bc6dbd94371 100644 --- a/ydb/core/kqp/ut/kqp_locks_ut.cpp +++ b/ydb/core/kqp/ut/kqp_locks_ut.cpp @@ -114,6 +114,94 @@ Y_UNIT_TEST_SUITE(KqpLocks) { )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); UNIT_ASSERT(result.IsSuccess()); } + + Y_UNIT_TEST_NEW_ENGINE(EmptyRange) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteDataQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + )"), TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteDataQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "Session2", 2); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + result = session1.ExecuteDataQuery(Q1_(R"( + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "Session1", 1); + )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.Message.Contains("/Root/Test"); + })); + + result = session1.ExecuteDataQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST_NEW_ENGINE(EmptyRangeAlreadyBroken) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteDataQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 10; + )"), TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteDataQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "Session2", 2); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + + result = session1.ExecuteDataQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + + UPSERT INTO Test (Group, Name, Amount) VALUES + (11, "Session1", 1); + )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.Message.Contains("/Root/Test"); + })); + + result = session1.ExecuteDataQuery(Q1_(R"( + SELECT * FROM Test WHERE Group = 11; + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKqp diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index a557d87a2b9..0e10561ab67 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -530,9 +530,13 @@ bool TKqpDatashardComputeContext::ReadRowImpl(const TTableId& tableId, TReadTabl stats.SelectRangeRows = 1; stats.SelectRangeBytes = rowSize; - stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0); - stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0); + break; + } + + stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0); + stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0); + if (iterator.Last() == NTable::EReady::Data) { return true; } @@ -577,9 +581,13 @@ bool TKqpDatashardComputeContext::ReadRowWideImpl(const TTableId& tableId, TRead stats.SelectRangeRows = 1; stats.SelectRangeBytes = rowSize; - stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0); - stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0); + break; + } + + stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0); + stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0); + if (iterator.Last() == NTable::EReady::Data) { return true; } |