1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
#include <Interpreters/InterpreterTransactionControlQuery.h>
#include <Parsers/ASTTransactionControl.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_TRANSACTION;
extern const int UNKNOWN_STATUS_OF_TRANSACTION;
}
BlockIO InterpreterTransactionControlQuery::execute()
{
if (!query_context->hasSessionContext())
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction Control Language queries are allowed only inside session");
ContextMutablePtr session_context = query_context->getSessionContext();
const auto & tcl = query_ptr->as<const ASTTransactionControl &>();
switch (tcl.action)
{
case ASTTransactionControl::BEGIN:
return executeBegin(session_context);
case ASTTransactionControl::COMMIT:
return executeCommit(session_context);
case ASTTransactionControl::ROLLBACK:
return executeRollback(session_context);
case ASTTransactionControl::SET_SNAPSHOT:
return executeSetSnapshot(session_context, tcl.snapshot);
}
UNREACHABLE();
}
BlockIO InterpreterTransactionControlQuery::executeBegin(ContextMutablePtr session_context)
{
if (session_context->getCurrentTransaction())
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Nested transactions are not supported");
session_context->checkTransactionsAreAllowed(/* explicit_tcl_query = */ true);
auto txn = TransactionLog::instance().beginTransaction();
session_context->initCurrentTransaction(txn);
query_context->setCurrentTransaction(txn);
return {};
}
BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr session_context)
{
auto txn = session_context->getCurrentTransaction();
if (!txn)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
if (txn->getState() != MergeTreeTransaction::RUNNING)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction is not in RUNNING state");
TransactionsWaitCSNMode mode = query_context->getSettingsRef().wait_changes_become_visible_after_commit_mode;
CSN csn;
try
{
csn = TransactionLog::instance().commitTransaction(txn, /* throw_on_unknown_status */ mode != TransactionsWaitCSNMode::WAIT_UNKNOWN);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION)
{
/// Detach transaction from current context if connection was lost and its status is unknown
/// (so it will be possible to start new one)
session_context->setCurrentTransaction(NO_TRANSACTION_PTR);
}
throw;
}
if (csn == Tx::CommittingCSN)
{
chassert(mode == TransactionsWaitCSNMode::WAIT_UNKNOWN);
/// Try to wait for connection to be restored and its status to be loaded.
/// It's useful for testing. It allows to enable fault injection (after commit) without breaking tests.
txn->waitStateChange(Tx::CommittingCSN);
CSN csn_changed_state = txn->getCSN();
if (csn_changed_state == Tx::UnknownCSN)
{
/// CommittingCSN -> UnknownCSN -> RolledBackCSN
/// It's possible if connection was lost before commit
/// (maybe we should get rid of intermediate UnknownCSN in this transition)
txn->waitStateChange(Tx::UnknownCSN);
chassert(txn->getCSN() == Tx::RolledBackCSN);
}
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction {} was rolled back", txn->tid);
if (txn->getState() != MergeTreeTransaction::COMMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction {} has invalid state {}", txn->tid, txn->getState());
csn = txn->getCSN();
}
/// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes
if (mode != TransactionsWaitCSNMode::ASYNC)
TransactionLog::instance().waitForCSNLoaded(csn);
session_context->setCurrentTransaction(NO_TRANSACTION_PTR);
return {};
}
BlockIO InterpreterTransactionControlQuery::executeRollback(ContextMutablePtr session_context)
{
auto txn = session_context->getCurrentTransaction();
if (!txn)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
if (txn->getState() == MergeTreeTransaction::COMMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTED state");
if (txn->getState() == MergeTreeTransaction::COMMITTING)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTING state");
if (txn->getState() == MergeTreeTransaction::RUNNING)
TransactionLog::instance().rollbackTransaction(txn);
session_context->setCurrentTransaction(NO_TRANSACTION_PTR);
return {};
}
BlockIO InterpreterTransactionControlQuery::executeSetSnapshot(ContextMutablePtr session_context, UInt64 snapshot)
{
auto txn = session_context->getCurrentTransaction();
if (!txn)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
if (snapshot <= Tx::MaxReservedCSN && snapshot != Tx::PrehistoricCSN && snapshot != Tx::EverythingVisibleCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot set snapshot to reserved CSN");
txn->setSnapshot(snapshot);
return {};
}
}
|