aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-09-15 21:32:16 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-09-15 21:32:16 +0300
commitda491339a4df97737e42147dc6b0f4672e679b4e (patch)
tree59f69c768928d568adaa4bb2601aad08f20c195f
parent1a3ac975fcaaeb1876b2487ca6ed8fa3fd170d9f (diff)
downloadydb-da491339a4df97737e42147dc6b0f4672e679b4e.tar.gz
Locks checking in KqpSessionActor
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp35
-rw-r--r--ydb/core/kqp/ut/kqp_tx_ut.cpp37
2 files changed, 54 insertions, 18 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index c18ffb02f5..5189cf835c 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -242,10 +242,8 @@ public:
TStringBuilder() << "Transaction not found: " << txControl.tx_id())};
ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
} else {
- QueryState->TxCtx = txCtx;
txCtx->Invalidate();
- TransactionsToBeAborted.emplace_back(txCtx);
- RemoveTransaction(txId);
+ InvalidateExplicitTransaction(txCtx, txId);
ReplySuccess();
}
@@ -963,10 +961,6 @@ public:
auto& txCtx = *QueryState->TxCtx;
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- if (!CheckTransacionLocks()) {
- return;
- }
-
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
YQL_ENSURE(QueryState->CurrentTx < phyQuery.TransactionsSize());
@@ -978,8 +972,9 @@ public:
ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST)
<< "Failed to mix queries with old- and new- engines";
}
+ LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
+
if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
- LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
++QueryState->CurrentTx;
tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery,
&phyQuery.GetTransactions(QueryState->CurrentTx));
@@ -988,6 +983,9 @@ public:
break;
}
}
+ if (!CheckTransacionLocks()) {
+ return;
+ }
bool commit = QueryState->Commit && QueryState->CurrentTx == phyQuery.TransactionsSize() - 1;
if (tx || commit) {
@@ -1008,10 +1006,6 @@ public:
LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit
<< " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size());
- if (!CheckTransacionLocks()) {
- return true;
- }
-
// TODO Handle timeouts -- request.Timeout, request.CancelAfter
if (tx) {
@@ -1150,10 +1144,7 @@ public:
LOG_I(SelfId() << " " << requestInfo << " TEvTxResponse has non-success status, CurrentTx: "
<< QueryState->CurrentTx << " response->DebugString(): " << response->DebugString());
- auto& txCtx = QueryState->TxCtx;
- txCtx->Invalidate();
- TransactionsToBeAborted.emplace_back(txCtx);
- RemoveTransaction(QueryState->TxId);
+ QueryState->TxCtx->Invalidate();
auto status = response->GetStatus();
TIssues issues;
@@ -1535,8 +1526,7 @@ public:
auto txCtx = FindTransaction(txId);
if (txCtx) {
txCtx->Invalidate();
- TransactionsToBeAborted.emplace_back(txCtx);
- RemoveTransaction(txId);
+ InvalidateExplicitTransaction(txCtx, txId);
}
txId = CreateEmptyULID();
txId_Human = "";
@@ -1775,9 +1765,18 @@ public:
Cleanup(true);
}
+ void InvalidateExplicitTransaction(TIntrusivePtr<TKqpTransactionContext> txCtx, const TULID& txId) {
+ TransactionsToBeAborted.emplace_back(std::move(txCtx));
+ RemoveTransaction(txId);
+ }
+
void Cleanup(bool isFinal = false) {
isFinal = isFinal || !QueryState->KeepSession;
+ if (QueryState && QueryState->TxCtx && QueryState->TxCtx->IsInvalidated()) {
+ InvalidateExplicitTransaction(QueryState->TxCtx, QueryState->TxId);
+ }
+
if (isFinal)
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
diff --git a/ydb/core/kqp/ut/kqp_tx_ut.cpp b/ydb/core/kqp/ut/kqp_tx_ut.cpp
index 0fa86cde1e..26eb2591b7 100644
--- a/ydb/core/kqp/ut/kqp_tx_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_tx_ut.cpp
@@ -81,6 +81,43 @@ Y_UNIT_TEST_SUITE(KqpTx) {
UNIT_ASSERT(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND));
}
+ Y_UNIT_TEST_QUAD(LocksAbortOnCommit, UseNewEngine, UseSessionActor) {
+ auto kikimr = KikimrRunnerEnableSessionActor(UseNewEngine && UseSessionActor);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ {
+ auto result = session.ExecuteDataQuery(Q_(R"(
+ UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, "One");
+ UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (2, "Two");
+ UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (3, "Three");
+ UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (4, "Four");
+ )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+
+ auto result = session.ExecuteDataQuery(Q_(R"(
+ SELECT * FROM `/Root/KeyValue`;
+ )"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto tx = result.GetTransaction();
+
+ result = session.ExecuteDataQuery(Q_(R"(
+ UPDATE `/Root/KeyValue` SET Value = "second" WHERE Key = 3;
+ )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ result = session.ExecuteDataQuery(Q_(R"(
+ UPDATE `/Root/KeyValue` SET Value = "third" WHERE Key = 4;
+ )"), TTxControl::Tx(*tx)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+
+ auto commitResult = tx->Commit().ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString());
+ }
+
Y_UNIT_TEST_NEW_ENGINE(InteractiveTx) {
auto kikimr = KikimrRunnerEnableSessionActor(UseNewEngine);
auto db = kikimr.GetTableClient();