1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
#pragma once
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/MergeTreeTransactionHolder.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ThreadPool.h>
#include <boost/noncopyable.hpp>
#include <mutex>
#include <unordered_map>
namespace DB
{
/// We want to create a TransactionLog object lazily and avoid creation if it's not needed.
/// But we also want to call shutdown() in a specific place to avoid race conditions.
/// We cannot simply use return-static-variable pattern,
/// because a call to shutdown() may construct unnecessary object in this case.
template <typename Derived>
class SingletonHelper : private boost::noncopyable
{
public:
static Derived & instance()
{
Derived * ptr = instance_raw_ptr.load();
if (likely(ptr))
return *ptr;
return createInstanceOrThrow();
}
static void shutdownIfAny()
{
std::lock_guard lock{instance_mutex};
if (instance_holder)
instance_holder->shutdown();
}
private:
static Derived & createInstanceOrThrow();
static inline std::atomic<Derived *> instance_raw_ptr;
/// It was supposed to be std::optional, but gcc fails to compile it for some reason
static inline std::shared_ptr<Derived> instance_holder;
static inline std::mutex instance_mutex;
};
class TransactionsInfoLog;
using TransactionsInfoLogPtr = std::shared_ptr<TransactionsInfoLog>;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
/// This class maintains transaction log in ZooKeeper and a list of currently running transactions in memory.
///
/// Each transaction has unique ID (TID, see details below).
/// TransactionID is allocated when transaction begins.
///
/// We use TransactionID to associate changes (created/removed data parts) with transaction that has made/is going to make these changes.
/// To commit a transaction we create sequential node "/path_to_log/log/csn-" in ZK and write TID into this node.
/// Allocated sequential number is a commit timestamp or Commit Sequence Number (CSN). It indicates a (logical) point in time
/// when transaction is committed and all its changes became visible. So we have total order of all changes.
///
/// Also CSNs are used as snapshots: all changes that were made by a transaction that was committed with a CSN less or equal than some_csn
/// are visible in some_csn snapshot.
///
/// TransactionID consists of three parts: (start_csn, local_tid, host_id)
/// - start_csn is the newest CSN that existed when the transaction was started and also it's snapshot that is visible for this transaction
/// - local_tid is local sequential number of the transaction, each server allocates local_tids independently without requests to ZK
/// - host_id is persistent UUID of host that has started the transaction, it's kind of tie-breaker that makes ID unique across all servers
///
/// To check if some transaction is committed or not we fetch "csn-xxxxxx" nodes from ZK and construct TID -> CSN mapping,
/// so for committed transactions we know commit timestamps.
/// However, if we did not find a mapping for some TID, it means one of the following cases:
/// 1. Transaction is not committed (yet)
/// 2. Transaction is rolled back (quite similar to the first case, but it will never be committed)
/// 3. Transactions was committed a long time ago and we removed its entry from the log
/// To distinguish the third case we store a "tail pointer" in "/path_to_log/tail_ptr". It's a CSN such that it's safe to remove from log
/// entries with tid.start_csn < tail_ptr, because CSNs for those TIDs are already written into data parts
/// and we will not do a CSN lookup for those TIDs anymore.
///
/// (however, transactions involving multiple hosts and/or ReplicatedMergeTree tables are currently not supported)
class TransactionLog final : public SingletonHelper<TransactionLog>
{
public:
TransactionLog();
~TransactionLog();
void shutdown();
/// Returns the newest snapshot available for reading
CSN getLatestSnapshot() const;
/// Returns the oldest snapshot that is visible for some running transaction
CSN getOldestSnapshot() const;
/// Allocates TID, returns new transaction object
MergeTreeTransactionPtr beginTransaction();
/// Tries to commit transaction. Returns Commit Sequence Number.
/// Throw if transaction was concurrently killed or if some precommit check failed.
/// May throw if ZK connection is lost. Transaction status is unknown in this case.
/// Returns CommittingCSN if throw_on_unknown_status is false and connection was lost.
CSN commitTransaction(const MergeTreeTransactionPtr & txn, bool throw_on_unknown_status);
/// Releases locks that that were acquired by transaction, releases snapshot, removes transaction from the list of active transactions.
/// Normally it should not throw, but if it does for some reason (global memory limit exceeded, disk failure, etc)
/// then we should terminate server and reinitialize it to avoid corruption of data structures. That's why it's noexcept.
void rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept;
/// Returns CSN if transaction with specified ID was committed and UnknownCSN if it was not.
/// Returns PrehistoricCSN for PrehistoricTID without creating a TransactionLog instance as a special case.
/// Some time a transaction could be committed concurrently, in order to resolve it provide failback_with_strict_load_csn
static CSN getCSN(const TransactionID & tid, const std::atomic<CSN> * failback_with_strict_load_csn = nullptr);
static CSN getCSN(const TIDHash & tid, const std::atomic<CSN> * failback_with_strict_load_csn = nullptr);
static CSN getCSNAndAssert(const TransactionID & tid, std::atomic<CSN> & failback_with_strict_load_csn);
/// Ensures that getCSN returned UnknownCSN because transaction is not committed and not because entry was removed from the log.
static void assertTIDIsNotOutdated(const TransactionID & tid, const std::atomic<CSN> * failback_with_strict_load_csn = nullptr);
/// Returns a pointer to transaction object if it's running or nullptr.
MergeTreeTransactionPtr tryGetRunningTransaction(const TIDHash & tid);
using TransactionsList = std::unordered_map<TIDHash, MergeTreeTransactionPtr>;
/// Returns copy of list of running transactions.
TransactionsList getTransactionsList() const;
/// Waits for provided CSN (and all previous ones) to be loaded from the log.
/// Returns false if waiting was interrupted (e.g. by shutdown)
bool waitForCSNLoaded(CSN csn) const;
bool isShuttingDown() const { return stop_flag.load(); }
void sync() const;
private:
void loadLogFromZooKeeper() TSA_REQUIRES(mutex);
void runUpdatingThread();
void loadEntries(Strings::const_iterator beg, Strings::const_iterator end);
void loadNewEntries();
void removeOldEntries();
CSN finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn, scope_guard & state_guard) noexcept;
void tryFinalizeUnknownStateTransactions();
static UInt64 deserializeCSN(const String & csn_node_name);
static String serializeCSN(CSN csn);
static TransactionID deserializeTID(const String & csn_node_content);
static String serializeTID(const TransactionID & tid);
ZooKeeperPtr getZooKeeper() const;
/// Some time a transaction could be committed concurrently, in order to resolve it provide failback_with_strict_load_csn
CSN getCSNImpl(const TIDHash & tid_hash, const std::atomic<CSN> * failback_with_strict_load_csn = nullptr) const;
const ContextPtr global_context;
Poco::Logger * const log;
/// The newest snapshot available for reading
std::atomic<CSN> latest_snapshot;
/// Local part of TransactionID number. We reset this counter for each new snapshot.
std::atomic<LocalTID> local_tid_counter;
mutable std::mutex mutex;
/// Mapping from TransactionID to CSN for recently committed transactions.
/// Allows to check if some transactions is committed.
struct CSNEntry
{
CSN csn;
TransactionID tid;
};
using TIDMap = std::unordered_map<TIDHash, CSNEntry>;
TIDMap tid_to_csn TSA_GUARDED_BY(mutex);
mutable std::mutex running_list_mutex;
/// Transactions that are currently processed
TransactionsList running_list TSA_GUARDED_BY(running_list_mutex);
/// If we lost connection on attempt to create csn- node then we don't know transaction's state.
using UnknownStateList = std::vector<std::pair<MergeTreeTransactionPtr, scope_guard>>;
UnknownStateList unknown_state_list TSA_GUARDED_BY(running_list_mutex);
UnknownStateList unknown_state_list_loaded TSA_GUARDED_BY(running_list_mutex);
/// Ordered list of snapshots that are currently used by some transactions. Needed for background cleanup.
std::list<CSN> snapshots_in_use TSA_GUARDED_BY(running_list_mutex);
ZooKeeperPtr zookeeper TSA_GUARDED_BY(mutex);
const String zookeeper_path;
const String zookeeper_path_log;
/// Name of the newest entry that was loaded from log in ZK
String last_loaded_entry TSA_GUARDED_BY(mutex);
/// The oldest CSN such that we store in log entries with TransactionIDs containing this CSN.
std::atomic<CSN> tail_ptr = Tx::UnknownCSN;
zkutil::EventPtr log_updated_event = std::make_shared<Poco::Event>();
std::atomic_bool stop_flag = false;
ThreadFromGlobalPool updating_thread;
const Float64 fault_probability_before_commit = 0;
const Float64 fault_probability_after_commit = 0;
};
template <typename Derived>
Derived & SingletonHelper<Derived>::createInstanceOrThrow()
{
std::lock_guard lock{instance_mutex};
if (!instance_holder)
{
instance_holder = std::make_shared<Derived>();
instance_raw_ptr = instance_holder.get();
}
return *instance_holder;
}
}
|