diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-09-15 21:32:16 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-09-15 21:32:16 +0300 |
commit | da491339a4df97737e42147dc6b0f4672e679b4e (patch) | |
tree | 59f69c768928d568adaa4bb2601aad08f20c195f | |
parent | 1a3ac975fcaaeb1876b2487ca6ed8fa3fd170d9f (diff) | |
download | ydb-da491339a4df97737e42147dc6b0f4672e679b4e.tar.gz |
Locks checking in KqpSessionActor
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 35 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_tx_ut.cpp | 37 |
2 files changed, 54 insertions, 18 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index c18ffb02f5..5189cf835c 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -242,10 +242,8 @@ public: TStringBuilder() << "Transaction not found: " << txControl.tx_id())}; ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); } else { - QueryState->TxCtx = txCtx; txCtx->Invalidate(); - TransactionsToBeAborted.emplace_back(txCtx); - RemoveTransaction(txId); + InvalidateExplicitTransaction(txCtx, txId); ReplySuccess(); } @@ -963,10 +961,6 @@ public: auto& txCtx = *QueryState->TxCtx; auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - if (!CheckTransacionLocks()) { - return; - } - const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); YQL_ENSURE(QueryState->CurrentTx < phyQuery.TransactionsSize()); @@ -978,8 +972,9 @@ public: ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Failed to mix queries with old- and new- engines"; } + LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); + if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { - LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); ++QueryState->CurrentTx; tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx)); @@ -988,6 +983,9 @@ public: break; } } + if (!CheckTransacionLocks()) { + return; + } bool commit = QueryState->Commit && QueryState->CurrentTx == phyQuery.TransactionsSize() - 1; if (tx || commit) { @@ -1008,10 +1006,6 @@ public: LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit << " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size()); - if (!CheckTransacionLocks()) { - return true; - } - // TODO Handle timeouts -- request.Timeout, request.CancelAfter if (tx) { @@ -1150,10 +1144,7 @@ public: LOG_I(SelfId() << " " << requestInfo << " TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx << " response->DebugString(): " << response->DebugString()); - auto& txCtx = QueryState->TxCtx; - txCtx->Invalidate(); - TransactionsToBeAborted.emplace_back(txCtx); - RemoveTransaction(QueryState->TxId); + QueryState->TxCtx->Invalidate(); auto status = response->GetStatus(); TIssues issues; @@ -1535,8 +1526,7 @@ public: auto txCtx = FindTransaction(txId); if (txCtx) { txCtx->Invalidate(); - TransactionsToBeAborted.emplace_back(txCtx); - RemoveTransaction(txId); + InvalidateExplicitTransaction(txCtx, txId); } txId = CreateEmptyULID(); txId_Human = ""; @@ -1775,9 +1765,18 @@ public: Cleanup(true); } + void InvalidateExplicitTransaction(TIntrusivePtr<TKqpTransactionContext> txCtx, const TULID& txId) { + TransactionsToBeAborted.emplace_back(std::move(txCtx)); + RemoveTransaction(txId); + } + void Cleanup(bool isFinal = false) { isFinal = isFinal || !QueryState->KeepSession; + if (QueryState && QueryState->TxCtx && QueryState->TxCtx->IsInvalidated()) { + InvalidateExplicitTransaction(QueryState->TxCtx, QueryState->TxId); + } + if (isFinal) Counters->ReportSessionActorClosedRequest(Settings.DbCounters); diff --git a/ydb/core/kqp/ut/kqp_tx_ut.cpp b/ydb/core/kqp/ut/kqp_tx_ut.cpp index 0fa86cde1e..26eb2591b7 100644 --- a/ydb/core/kqp/ut/kqp_tx_ut.cpp +++ b/ydb/core/kqp/ut/kqp_tx_ut.cpp @@ -81,6 +81,43 @@ Y_UNIT_TEST_SUITE(KqpTx) { UNIT_ASSERT(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND)); } + Y_UNIT_TEST_QUAD(LocksAbortOnCommit, UseNewEngine, UseSessionActor) { + auto kikimr = KikimrRunnerEnableSessionActor(UseNewEngine && UseSessionActor); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + { + auto result = session.ExecuteDataQuery(Q_(R"( + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, "One"); + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (2, "Two"); + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (3, "Three"); + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (4, "Four"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + + auto result = session.ExecuteDataQuery(Q_(R"( + SELECT * FROM `/Root/KeyValue`; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx = result.GetTransaction(); + + result = session.ExecuteDataQuery(Q_(R"( + UPDATE `/Root/KeyValue` SET Value = "second" WHERE Key = 3; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session.ExecuteDataQuery(Q_(R"( + UPDATE `/Root/KeyValue` SET Value = "third" WHERE Key = 4; + )"), TTxControl::Tx(*tx)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + + auto commitResult = tx->Commit().ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString()); + } + Y_UNIT_TEST_NEW_ENGINE(InteractiveTx) { auto kikimr = KikimrRunnerEnableSessionActor(UseNewEngine); auto db = kikimr.GetTableClient(); |