diff options
author | spuchin <spuchin@ydb.tech> | 2023-03-12 17:13:07 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-03-12 17:13:07 +0300 |
commit | 0abce4801507e87f95e23263f0bd91c5d69d3e02 (patch) | |
tree | 0d7d9d589a3589692d2fd2230d2d51a737b23d62 | |
parent | 21b5551c3aee31de7abfb1c8a7a890f5ce124c62 (diff) | |
download | ydb-0abce4801507e87f95e23263f0bd91c5d69d3e02.tar.gz |
Fix unsafe merge of last read tx with commit tx. ()
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 28 |
2 files changed, 36 insertions, 2 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 04d7ff110e..a651e6401e 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1128,8 +1128,14 @@ public: if (QueryState->Commit && Config->FeatureFlags.GetEnableKqpImmediateEffects() && phyQuery.GetHasUncommittedChangesRead()) { // every phy tx should acquire LockTxId, so commit is sent separately at the end commit = QueryState->CurrentTx >= phyQuery.TransactionsSize(); - } else if (QueryState->Commit) { - commit = QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1; + } else if (QueryState->Commit && QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1) { + if (!tx) { + // no physical transactions left, perform commit + commit = true; + } else { + // we can merge commit with last tx only for read-only transactions + commit = txCtx.DeferredEffects.Empty(); + } } if (tx || commit) { diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index dd10e979f6..cf0165ab18 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -1119,6 +1119,34 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { )", FormatResultSetYson(result.GetResultSet(0))); } + Y_UNIT_TEST(ReadAfterWrite) { + auto settings = TKikimrSettings(); + auto kikimr = TKikimrRunner{settings}; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + UPSERT INTO KeyValue (Key, Value) VALUES (3u, "Three") + )", TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + AssertSuccessResult(result); + + auto tx = result.GetTransaction(); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + result = session.ExecuteDataQuery(R"( + SELECT Amount FROM Test WHERE Group = 1 ORDER BY Amount DESC; + )", TTxControl::Tx(*tx).CommitTx(), execSettings).ExtractValueSync(); + AssertSuccessResult(result); + + CompareYson(R"([[[3500u]];[[300u]]])", FormatResultSetYson(result.GetResultSet(0))); + + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + // Commit cannot be merged with physical tx for read-write transactions + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + } + Y_UNIT_TEST(PrunePartitionsByLiteral) { TKikimrSettings settings; TKikimrRunner kikimr(settings); |