aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp')
-rw-r--r--contrib/clickhouse/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp145
1 files changed, 145 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp b/contrib/clickhouse/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp
new file mode 100644
index 0000000000..bf8e879e3d
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp
@@ -0,0 +1,145 @@
+#include <Storages/MergeTree/MutatePlainMergeTreeTask.h>
+
+#include <Storages/StorageMergeTree.h>
+#include <Interpreters/TransactionLog.h>
+#include <Common/ProfileEventsScope.h>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+}
+
+
+StorageID MutatePlainMergeTreeTask::getStorageID() const
+{
+ return storage.getStorageID();
+}
+
+void MutatePlainMergeTreeTask::onCompleted()
+{
+ bool delay = state == State::SUCCESS;
+ task_result_callback(delay);
+}
+
+void MutatePlainMergeTreeTask::prepare()
+{
+ future_part = merge_mutate_entry->future_part;
+
+ task_context = createTaskContext();
+ merge_list_entry = storage.getContext()->getMergeList().insert(
+ storage.getStorageID(),
+ future_part,
+ task_context);
+
+ stopwatch = std::make_unique<Stopwatch>();
+
+ write_part_log = [this] (const ExecutionStatus & execution_status)
+ {
+ auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
+ mutate_task.reset();
+ storage.writePartLog(
+ PartLogElement::MUTATE_PART,
+ execution_status,
+ stopwatch->elapsed(),
+ future_part->name,
+ new_part,
+ future_part->parts,
+ merge_list_entry.get(),
+ std::move(profile_counters_snapshot));
+ };
+
+ if (task_context->getSettingsRef().enable_sharing_sets_for_mutations)
+ {
+ /// If we have a prepared sets cache for this mutations, we will use it.
+ auto mutation_id = future_part->part_info.mutation;
+ auto prepared_sets_cache_for_mutation = storage.getPreparedSetsCache(mutation_id);
+ task_context->setPreparedSetsCache(prepared_sets_cache_for_mutation);
+ }
+
+ mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
+ future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(),
+ time(nullptr), task_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
+}
+
+
+bool MutatePlainMergeTreeTask::executeStep()
+{
+ /// Metrics will be saved in the local profile_counters.
+ ProfileEventsScope profile_events_scope(&profile_counters);
+
+ /// Make out memory tracker a parent of current thread memory tracker
+ std::optional<ThreadGroupSwitcher> switcher;
+ if (merge_list_entry)
+ switcher.emplace((*merge_list_entry)->thread_group);
+
+ switch (state)
+ {
+ case State::NEED_PREPARE:
+ {
+ prepare();
+ state = State::NEED_EXECUTE;
+ return true;
+ }
+ case State::NEED_EXECUTE:
+ {
+ try
+ {
+ if (mutate_task->execute())
+ return true;
+
+ new_part = mutate_task->getFuture().get();
+ auto & data_part_storage = new_part->getDataPartStorage();
+ if (data_part_storage.hasActiveTransaction())
+ data_part_storage.precommitTransaction();
+
+ MergeTreeData::Transaction transaction(storage, merge_mutate_entry->txn.get());
+ /// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction
+ storage.renameTempPartAndReplace(new_part, transaction);
+ transaction.commit();
+
+ storage.updateMutationEntriesErrors(future_part, true, "");
+ write_part_log({});
+
+ state = State::NEED_FINISH;
+ return true;
+ }
+ catch (...)
+ {
+ if (merge_mutate_entry->txn)
+ merge_mutate_entry->txn->onException();
+ PreformattedMessage exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
+ LOG_ERROR(&Poco::Logger::get("MutatePlainMergeTreeTask"), exception_message);
+ storage.updateMutationEntriesErrors(future_part, false, exception_message.text);
+ write_part_log(ExecutionStatus::fromCurrentException("", true));
+ tryLogCurrentException(__PRETTY_FUNCTION__);
+ return false;
+ }
+ }
+ case State::NEED_FINISH:
+ {
+ // Nothing to do
+ state = State::SUCCESS;
+ return false;
+ }
+ case State::SUCCESS:
+ {
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Task with state SUCCESS mustn't be executed again");
+ }
+ }
+
+ return false;
+}
+
+ContextMutablePtr MutatePlainMergeTreeTask::createTaskContext() const
+{
+ auto context = Context::createCopy(storage.getContext());
+ context->makeQueryContext();
+ auto queryId = getQueryId();
+ context->setCurrentQueryId(queryId);
+ return context;
+}
+
+}