1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
#include <Storages/MergeTree/MutatePlainMergeTreeTask.h>
#include <Storages/StorageMergeTree.h>
#include <Interpreters/TransactionLog.h>
#include <Common/ProfileEventsScope.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
StorageID MutatePlainMergeTreeTask::getStorageID() const
{
return storage.getStorageID();
}
void MutatePlainMergeTreeTask::onCompleted()
{
bool delay = state == State::SUCCESS;
task_result_callback(delay);
}
void MutatePlainMergeTreeTask::prepare()
{
future_part = merge_mutate_entry->future_part;
task_context = createTaskContext();
merge_list_entry = storage.getContext()->getMergeList().insert(
storage.getStorageID(),
future_part,
task_context);
stopwatch = std::make_unique<Stopwatch>();
write_part_log = [this] (const ExecutionStatus & execution_status)
{
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
mutate_task.reset();
storage.writePartLog(
PartLogElement::MUTATE_PART,
execution_status,
stopwatch->elapsed(),
future_part->name,
new_part,
future_part->parts,
merge_list_entry.get(),
std::move(profile_counters_snapshot));
};
if (task_context->getSettingsRef().enable_sharing_sets_for_mutations)
{
/// If we have a prepared sets cache for this mutations, we will use it.
auto mutation_id = future_part->part_info.mutation;
auto prepared_sets_cache_for_mutation = storage.getPreparedSetsCache(mutation_id);
task_context->setPreparedSetsCache(prepared_sets_cache_for_mutation);
}
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(),
time(nullptr), task_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
}
bool MutatePlainMergeTreeTask::executeStep()
{
/// Metrics will be saved in the local profile_counters.
ProfileEventsScope profile_events_scope(&profile_counters);
/// Make out memory tracker a parent of current thread memory tracker
std::optional<ThreadGroupSwitcher> switcher;
if (merge_list_entry)
switcher.emplace((*merge_list_entry)->thread_group);
switch (state)
{
case State::NEED_PREPARE:
{
prepare();
state = State::NEED_EXECUTE;
return true;
}
case State::NEED_EXECUTE:
{
try
{
if (mutate_task->execute())
return true;
new_part = mutate_task->getFuture().get();
auto & data_part_storage = new_part->getDataPartStorage();
if (data_part_storage.hasActiveTransaction())
data_part_storage.precommitTransaction();
MergeTreeData::Transaction transaction(storage, merge_mutate_entry->txn.get());
/// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction
storage.renameTempPartAndReplace(new_part, transaction);
transaction.commit();
storage.updateMutationEntriesErrors(future_part, true, "");
write_part_log({});
state = State::NEED_FINISH;
return true;
}
catch (...)
{
if (merge_mutate_entry->txn)
merge_mutate_entry->txn->onException();
PreformattedMessage exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
LOG_ERROR(&Poco::Logger::get("MutatePlainMergeTreeTask"), exception_message);
storage.updateMutationEntriesErrors(future_part, false, exception_message.text);
write_part_log(ExecutionStatus::fromCurrentException("", true));
tryLogCurrentException(__PRETTY_FUNCTION__);
return false;
}
}
case State::NEED_FINISH:
{
// Nothing to do
state = State::SUCCESS;
return false;
}
case State::SUCCESS:
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task with state SUCCESS mustn't be executed again");
}
}
return false;
}
ContextMutablePtr MutatePlainMergeTreeTask::createTaskContext() const
{
auto context = Context::createCopy(storage.getContext());
context->makeQueryContext();
auto queryId = getQueryId();
context->setCurrentQueryId(queryId);
return context;
}
}
|