aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-03-12 17:13:07 +0300
committerspuchin <spuchin@ydb.tech>2023-03-12 17:13:07 +0300
commit0abce4801507e87f95e23263f0bd91c5d69d3e02 (patch)
tree0d7d9d589a3589692d2fd2230d2d51a737b23d62
parent21b5551c3aee31de7abfb1c8a7a890f5ce124c62 (diff)
downloadydb-0abce4801507e87f95e23263f0bd91c5d69d3e02.tar.gz
Fix unsafe merge of last read tx with commit tx. ()
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp10
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp28
2 files changed, 36 insertions, 2 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 04d7ff110e..a651e6401e 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1128,8 +1128,14 @@ public:
if (QueryState->Commit && Config->FeatureFlags.GetEnableKqpImmediateEffects() && phyQuery.GetHasUncommittedChangesRead()) {
// every phy tx should acquire LockTxId, so commit is sent separately at the end
commit = QueryState->CurrentTx >= phyQuery.TransactionsSize();
- } else if (QueryState->Commit) {
- commit = QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1;
+ } else if (QueryState->Commit && QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1) {
+ if (!tx) {
+ // no physical transactions left, perform commit
+ commit = true;
+ } else {
+ // we can merge commit with last tx only for read-only transactions
+ commit = txCtx.DeferredEffects.Empty();
+ }
}
if (tx || commit) {
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index dd10e979f6..cf0165ab18 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -1119,6 +1119,34 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
)", FormatResultSetYson(result.GetResultSet(0)));
}
+ Y_UNIT_TEST(ReadAfterWrite) {
+ auto settings = TKikimrSettings();
+ auto kikimr = TKikimrRunner{settings};
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ UPSERT INTO KeyValue (Key, Value) VALUES (3u, "Three")
+ )", TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync();
+ AssertSuccessResult(result);
+
+ auto tx = result.GetTransaction();
+
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ result = session.ExecuteDataQuery(R"(
+ SELECT Amount FROM Test WHERE Group = 1 ORDER BY Amount DESC;
+ )", TTxControl::Tx(*tx).CommitTx(), execSettings).ExtractValueSync();
+ AssertSuccessResult(result);
+
+ CompareYson(R"([[[3500u]];[[300u]]])", FormatResultSetYson(result.GetResultSet(0)));
+
+ auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ // Commit cannot be merged with physical tx for read-write transactions
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ }
+
Y_UNIT_TEST(PrunePartitionsByLiteral) {
TKikimrSettings settings;
TKikimrRunner kikimr(settings);