summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/QueryPipeline.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/QueryPipeline/QueryPipeline.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/QueryPipeline.cpp')
-rw-r--r--contrib/clickhouse/src/QueryPipeline/QueryPipeline.cpp703
1 files changed, 703 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/QueryPipeline.cpp b/contrib/clickhouse/src/QueryPipeline/QueryPipeline.cpp
new file mode 100644
index 00000000000..128972b8ff0
--- /dev/null
+++ b/contrib/clickhouse/src/QueryPipeline/QueryPipeline.cpp
@@ -0,0 +1,703 @@
+#include <QueryPipeline/QueryPipeline.h>
+
+#include <queue>
+#include <QueryPipeline/Chain.h>
+#include <Processors/Formats/IOutputFormat.h>
+#include <Processors/IProcessor.h>
+#include <Processors/LimitTransform.h>
+#include <Interpreters/ActionsDAG.h>
+#include <Interpreters/ExpressionActions.h>
+#include <QueryPipeline/ReadProgressCallback.h>
+#include <QueryPipeline/Pipe.h>
+#include <Processors/Sinks/EmptySink.h>
+#include <Processors/Sinks/NullSink.h>
+#include <Processors/Sinks/SinkToStorage.h>
+#include <Processors/Sources/DelayedSource.h>
+#include <Processors/Sources/NullSource.h>
+#include <Processors/Sources/RemoteSource.h>
+#include <Processors/Sources/SourceFromChunks.h>
+#include <Processors/ISource.h>
+#include <Processors/Transforms/CountingTransform.h>
+#include <Processors/Transforms/LimitsCheckingTransform.h>
+#include <Processors/Transforms/MaterializingTransform.h>
+#include <Processors/Transforms/PartialSortingTransform.h>
+#include <Processors/Transforms/StreamInQueryCacheTransform.h>
+#include <Processors/Transforms/ExpressionTransform.h>
+#include <Processors/Transforms/TotalsHavingTransform.h>
+#include <Processors/QueryPlan/ReadFromPreparedSource.h>
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+}
+
+QueryPipeline::QueryPipeline()
+ : processors(std::make_shared<Processors>())
+{
+}
+
+QueryPipeline::QueryPipeline(QueryPipeline &&) noexcept = default;
+QueryPipeline & QueryPipeline::operator=(QueryPipeline &&) noexcept = default;
+QueryPipeline::~QueryPipeline() = default;
+
+static void checkInput(const InputPort & input, const ProcessorPtr & processor)
+{
+ if (!input.isConnected())
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create QueryPipeline because {} has disconnected input",
+ processor->getName());
+}
+
+static void checkOutput(const OutputPort & output, const ProcessorPtr & processor)
+{
+ if (!output.isConnected())
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create QueryPipeline because {} has disconnected output",
+ processor->getName());
+}
+
+static void checkPulling(
+ Processors & processors,
+ OutputPort * output,
+ OutputPort * totals,
+ OutputPort * extremes)
+{
+ if (!output || output->isConnected())
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create pulling QueryPipeline because its output port is connected or null");
+
+ if (totals && totals->isConnected())
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create pulling QueryPipeline because its totals port is connected");
+
+ if (extremes && extremes->isConnected())
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create pulling QueryPipeline because its extremes port is connected");
+
+ bool found_output = false;
+ bool found_totals = false;
+ bool found_extremes = false;
+ for (const auto & processor : processors)
+ {
+ for (const auto & in : processor->getInputs())
+ checkInput(in, processor);
+
+ for (const auto & out : processor->getOutputs())
+ {
+ if (&out == output)
+ found_output = true;
+ else if (totals && &out == totals)
+ found_totals = true;
+ else if (extremes && &out == extremes)
+ found_extremes = true;
+ else
+ checkOutput(out, processor);
+ }
+ }
+
+ if (!found_output)
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create pulling QueryPipeline because its output port does not belong to any processor");
+ if (totals && !found_totals)
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create pulling QueryPipeline because its totals port does not belong to any processor");
+ if (extremes && !found_extremes)
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create pulling QueryPipeline because its extremes port does not belong to any processor");
+}
+
+static void checkCompleted(Processors & processors)
+{
+ for (const auto & processor : processors)
+ {
+ for (const auto & in : processor->getInputs())
+ checkInput(in, processor);
+
+ for (const auto & out : processor->getOutputs())
+ checkOutput(out, processor);
+ }
+}
+
+static void initRowsBeforeLimit(IOutputFormat * output_format)
+{
+ RowsBeforeLimitCounterPtr rows_before_limit_at_least;
+ std::vector<IProcessor *> processors;
+ std::map<LimitTransform *, std::vector<size_t>> limit_candidates;
+ std::unordered_set<IProcessor *> visited;
+ bool has_limit = false;
+
+ struct QueuedEntry
+ {
+ IProcessor * processor;
+ LimitTransform * limit_processor;
+ ssize_t limit_input_port;
+ };
+
+ std::queue<QueuedEntry> queue;
+
+ queue.push({ output_format, nullptr, -1 });
+ visited.emplace(output_format);
+
+ while (!queue.empty())
+ {
+ auto * processor = queue.front().processor;
+ auto * limit_processor = queue.front().limit_processor;
+ auto limit_input_port = queue.front().limit_input_port;
+ queue.pop();
+
+ /// Set counter based on the following cases:
+ /// 1. Remote: Set counter on Remote
+ /// 2. Limit ... PartialSorting: Set counter on PartialSorting
+ /// 3. Limit ... TotalsHaving(with filter) ... Remote: Set counter on the input port of Limit
+ /// 4. Limit ... Remote: Set counter on Remote
+ /// 5. Limit ... : Set counter on the input port of Limit
+
+ /// Case 1.
+ if ((typeid_cast<RemoteSource *>(processor) || typeid_cast<DelayedSource *>(processor)) && !limit_processor)
+ {
+ processors.emplace_back(processor);
+ continue;
+ }
+
+ if (auto * limit = typeid_cast<LimitTransform *>(processor))
+ {
+ has_limit = true;
+
+ /// Ignore child limits
+ if (limit_processor)
+ continue;
+
+ limit_processor = limit;
+ limit_candidates[limit_processor] = {};
+ }
+ else if (limit_processor)
+ {
+ /// Case 2.
+ if (typeid_cast<PartialSortingTransform *>(processor))
+ {
+ processors.emplace_back(processor);
+ limit_candidates[limit_processor].push_back(limit_input_port);
+ continue;
+ }
+
+ /// Case 3.
+ if (auto * having = typeid_cast<TotalsHavingTransform *>(processor))
+ {
+ if (having->hasFilter())
+ continue;
+ }
+
+ /// Case 4.
+ if (typeid_cast<RemoteSource *>(processor) || typeid_cast<DelayedSource *>(processor))
+ {
+ processors.emplace_back(processor);
+ limit_candidates[limit_processor].push_back(limit_input_port);
+ continue;
+ }
+ }
+
+ /// Skip totals and extremes port for output format.
+ if (auto * format = dynamic_cast<IOutputFormat *>(processor))
+ {
+ auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor();
+ if (visited.emplace(child_processor).second)
+ queue.push({ child_processor, limit_processor, limit_input_port });
+
+ continue;
+ }
+
+ if (limit_processor == processor)
+ {
+ ssize_t i = 0;
+ for (auto & child_port : processor->getInputs())
+ {
+ auto * child_processor = &child_port.getOutputPort().getProcessor();
+ if (visited.emplace(child_processor).second)
+ queue.push({ child_processor, limit_processor, i });
+ ++i;
+ }
+ }
+ else
+ {
+ for (auto & child_port : processor->getInputs())
+ {
+ auto * child_processor = &child_port.getOutputPort().getProcessor();
+ if (visited.emplace(child_processor).second)
+ queue.push({ child_processor, limit_processor, limit_input_port });
+ }
+ }
+ }
+
+ /// Case 5.
+ for (auto && [limit, ports] : limit_candidates)
+ {
+ /// If there are some input ports which don't have the counter, add it to LimitTransform.
+ if (ports.size() < limit->getInputs().size())
+ {
+ processors.push_back(limit);
+ for (auto port : ports)
+ limit->setInputPortHasCounter(port);
+ }
+ }
+
+ if (!processors.empty())
+ {
+ rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
+ for (auto & processor : processors)
+ processor->setRowsBeforeLimitCounter(rows_before_limit_at_least);
+
+ /// If there is a limit, then enable rows_before_limit_at_least
+ /// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
+ if (has_limit)
+ rows_before_limit_at_least->add(0);
+
+ output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
+ }
+}
+
+
+QueryPipeline::QueryPipeline(
+ QueryPlanResourceHolder resources_,
+ std::shared_ptr<Processors> processors_)
+ : resources(std::move(resources_))
+ , processors(std::move(processors_))
+{
+ checkCompleted(*processors);
+}
+
+QueryPipeline::QueryPipeline(
+ QueryPlanResourceHolder resources_,
+ std::shared_ptr<Processors> processors_,
+ InputPort * input_)
+ : resources(std::move(resources_))
+ , processors(std::move(processors_))
+ , input(input_)
+{
+ if (!input || input->isConnected())
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create pushing QueryPipeline because its input port is connected or null");
+
+ bool found_input = false;
+ for (const auto & processor : *processors)
+ {
+ for (const auto & in : processor->getInputs())
+ {
+ if (&in == input)
+ found_input = true;
+ else
+ checkInput(in, processor);
+ }
+
+ for (const auto & out : processor->getOutputs())
+ checkOutput(out, processor);
+ }
+
+ if (!found_input)
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Cannot create pushing QueryPipeline because its input port does not belong to any processor");
+}
+
+QueryPipeline::QueryPipeline(std::shared_ptr<ISource> source) : QueryPipeline(Pipe(std::move(source))) {}
+
+QueryPipeline::QueryPipeline(
+ QueryPlanResourceHolder resources_,
+ std::shared_ptr<Processors> processors_,
+ OutputPort * output_,
+ OutputPort * totals_,
+ OutputPort * extremes_)
+ : resources(std::move(resources_))
+ , processors(std::move(processors_))
+ , output(output_)
+ , totals(totals_)
+ , extremes(extremes_)
+{
+ checkPulling(*processors, output, totals, extremes);
+}
+
+QueryPipeline::QueryPipeline(Pipe pipe)
+{
+ if (pipe.numOutputPorts() > 0)
+ {
+ pipe.resize(1);
+ output = pipe.getOutputPort(0);
+ totals = pipe.getTotalsPort();
+ extremes = pipe.getExtremesPort();
+
+ processors = std::move(pipe.processors);
+ checkPulling(*processors, output, totals, extremes);
+ }
+ else
+ {
+ processors = std::move(pipe.processors);
+ checkCompleted(*processors);
+ }
+}
+
+QueryPipeline::QueryPipeline(Chain chain)
+ : resources(chain.detachResources())
+ , processors(std::make_shared<Processors>())
+ , input(&chain.getInputPort())
+ , num_threads(chain.getNumThreads())
+{
+ processors->reserve(chain.getProcessors().size() + 1);
+ for (auto processor : chain.getProcessors())
+ processors->emplace_back(std::move(processor));
+
+ auto sink = std::make_shared<EmptySink>(chain.getOutputPort().getHeader());
+ connect(chain.getOutputPort(), sink->getPort());
+ processors->emplace_back(std::move(sink));
+
+ input = &chain.getInputPort();
+}
+
+QueryPipeline::QueryPipeline(std::shared_ptr<IOutputFormat> format)
+ : processors(std::make_shared<Processors>())
+{
+ auto & format_main = format->getPort(IOutputFormat::PortKind::Main);
+ auto & format_totals = format->getPort(IOutputFormat::PortKind::Totals);
+ auto & format_extremes = format->getPort(IOutputFormat::PortKind::Extremes);
+
+ if (!totals)
+ {
+ auto source = std::make_shared<NullSource>(format_totals.getHeader());
+ totals = &source->getPort();
+ processors->emplace_back(std::move(source));
+ }
+
+ if (!extremes)
+ {
+ auto source = std::make_shared<NullSource>(format_extremes.getHeader());
+ extremes = &source->getPort();
+ processors->emplace_back(std::move(source));
+ }
+
+ connect(*totals, format_totals);
+ connect(*extremes, format_extremes);
+
+ input = &format_main;
+ totals = nullptr;
+ extremes = nullptr;
+
+ output_format = format.get();
+
+ processors->emplace_back(std::move(format));
+}
+
+static void drop(OutputPort *& port, Processors & processors)
+{
+ if (!port)
+ return;
+
+ auto null_sink = std::make_shared<NullSink>(port->getHeader());
+ connect(*port, null_sink->getPort());
+
+ processors.emplace_back(std::move(null_sink));
+ port = nullptr;
+}
+
+QueryPipeline::QueryPipeline(std::shared_ptr<SinkToStorage> sink) : QueryPipeline(Chain(std::move(sink))) {}
+
+void QueryPipeline::complete(std::shared_ptr<ISink> sink)
+{
+ if (!pulling())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to be completed with sink");
+
+ drop(totals, *processors);
+ drop(extremes, *processors);
+
+ connect(*output, sink->getPort());
+ processors->emplace_back(std::move(sink));
+ output = nullptr;
+}
+
+void QueryPipeline::complete(Chain chain)
+{
+ if (!pulling())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to be completed with chain");
+
+ resources = chain.detachResources();
+
+ drop(totals, *processors);
+ drop(extremes, *processors);
+
+ processors->reserve(processors->size() + chain.getProcessors().size() + 1);
+ for (auto processor : chain.getProcessors())
+ processors->emplace_back(std::move(processor));
+
+ auto sink = std::make_shared<EmptySink>(chain.getOutputPort().getHeader());
+ connect(*output, chain.getInputPort());
+ connect(chain.getOutputPort(), sink->getPort());
+ processors->emplace_back(std::move(sink));
+ output = nullptr;
+}
+
+void QueryPipeline::complete(std::shared_ptr<SinkToStorage> sink)
+{
+ complete(Chain(std::move(sink)));
+}
+
+void QueryPipeline::complete(Pipe pipe)
+{
+ if (!pushing())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pushing to be completed with pipe");
+
+ pipe.resize(1);
+ pipe.dropExtremes();
+ pipe.dropTotals();
+ connect(*pipe.getOutputPort(0), *input);
+ input = nullptr;
+
+ auto pipe_processors = Pipe::detachProcessors(std::move(pipe));
+ processors->insert(processors->end(), pipe_processors.begin(), pipe_processors.end());
+}
+
+static void addMaterializing(OutputPort *& output, Processors & processors)
+{
+ if (!output)
+ return;
+
+ auto materializing = std::make_shared<MaterializingTransform>(output->getHeader());
+ connect(*output, materializing->getInputPort());
+ output = &materializing->getOutputPort();
+ processors.emplace_back(std::move(materializing));
+}
+
+void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
+{
+ if (!pulling())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to be completed with output format");
+
+ if (format->expectMaterializedColumns())
+ {
+ addMaterializing(output, *processors);
+ addMaterializing(totals, *processors);
+ addMaterializing(extremes, *processors);
+ }
+
+ auto & format_main = format->getPort(IOutputFormat::PortKind::Main);
+ auto & format_totals = format->getPort(IOutputFormat::PortKind::Totals);
+ auto & format_extremes = format->getPort(IOutputFormat::PortKind::Extremes);
+
+ if (!totals)
+ {
+ auto source = std::make_shared<NullSource>(format_totals.getHeader());
+ totals = &source->getPort();
+ processors->emplace_back(std::move(source));
+ }
+
+ if (!extremes)
+ {
+ auto source = std::make_shared<NullSource>(format_extremes.getHeader());
+ extremes = &source->getPort();
+ processors->emplace_back(std::move(source));
+ }
+
+ connect(*output, format_main);
+ connect(*totals, format_totals);
+ connect(*extremes, format_extremes);
+
+ output = nullptr;
+ totals = nullptr;
+ extremes = nullptr;
+
+ initRowsBeforeLimit(format.get());
+ output_format = format.get();
+
+ processors->emplace_back(std::move(format));
+}
+
+Block QueryPipeline::getHeader() const
+{
+ if (input)
+ return input->getHeader();
+ else if (output)
+ return output->getHeader();
+ else
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Header is available only for pushing or pulling QueryPipeline");
+}
+
+void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
+{
+ progress_callback = callback;
+}
+
+void QueryPipeline::setProcessListElement(QueryStatusPtr elem)
+{
+ process_list_element = elem;
+
+ if (pushing())
+ {
+ if (auto * counting = dynamic_cast<CountingTransform *>(&input->getProcessor()))
+ {
+ counting->setProcessListElement(elem);
+ }
+ }
+}
+
+void QueryPipeline::setQuota(std::shared_ptr<const EnabledQuota> quota_)
+{
+ quota = std::move(quota_);
+}
+
+void QueryPipeline::setLimitsAndQuota(const StreamLocalLimits & limits, std::shared_ptr<const EnabledQuota> quota_)
+{
+ if (!pulling())
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "It is possible to set limits and quota only to pulling QueryPipeline");
+
+ auto transform = std::make_shared<LimitsCheckingTransform>(output->getHeader(), limits);
+ transform->setQuota(quota_);
+ connect(*output, transform->getInputPort());
+ output = &transform->getOutputPort();
+ processors->emplace_back(std::move(transform));
+}
+
+bool QueryPipeline::tryGetResultRowsAndBytes(UInt64 & result_rows, UInt64 & result_bytes) const
+{
+ if (!output_format)
+ return false;
+
+ result_rows = output_format->getResultRows();
+ result_bytes = output_format->getResultBytes();
+ return true;
+}
+
+void QueryPipeline::writeResultIntoQueryCache(std::shared_ptr<QueryCache::Writer> query_cache_writer)
+{
+ assert(pulling());
+
+ /// Attach a special transform to all output ports (result + possibly totals/extremes). The only purpose of the transform is
+ /// to write each chunk into the query cache. All transforms hold a refcounted reference to the same query cache writer object.
+ /// This ensures that all transforms write to the single same cache entry. The writer object synchronizes internally, the
+ /// expensive stuff like cloning chunks happens outside lock scopes).
+
+ auto add_stream_in_query_cache_transform = [&](OutputPort *& out_port, QueryCache::Writer::ChunkType chunk_type)
+ {
+ if (!out_port)
+ return;
+
+ auto transform = std::make_shared<StreamInQueryCacheTransform>(out_port->getHeader(), query_cache_writer, chunk_type);
+ connect(*out_port, transform->getInputPort());
+ out_port = &transform->getOutputPort();
+ processors->emplace_back(std::move(transform));
+ };
+
+ using enum QueryCache::Writer::ChunkType;
+
+ add_stream_in_query_cache_transform(output, Result);
+ add_stream_in_query_cache_transform(totals, Totals);
+ add_stream_in_query_cache_transform(extremes, Extremes);
+}
+
+void QueryPipeline::finalizeWriteInQueryCache()
+{
+ auto it = std::find_if(
+ processors->begin(), processors->end(),
+ [](ProcessorPtr processor){ return dynamic_cast<StreamInQueryCacheTransform *>(&*processor); });
+
+ /// The pipeline can contain up to three StreamInQueryCacheTransforms which all point to the same query cache writer object.
+ /// We can call finalize() on any of them.
+ if (it != processors->end())
+ dynamic_cast<StreamInQueryCacheTransform &>(**it).finalizeWriteInQueryCache();
+}
+
+void QueryPipeline::readFromQueryCache(
+ std::unique_ptr<SourceFromChunks> source,
+ std::unique_ptr<SourceFromChunks> source_totals,
+ std::unique_ptr<SourceFromChunks> source_extremes)
+{
+ /// Construct the pipeline from the input source processors. The processors are provided by the query cache to produce chunks of a
+ /// previous query result.
+
+ auto add_stream_from_query_cache_source = [&](OutputPort *& out_port, std::unique_ptr<SourceFromChunks> source_)
+ {
+ if (!source_)
+ return;
+ out_port = &source_->getPort();
+ processors->emplace_back(std::shared_ptr<SourceFromChunks>(std::move(source_)));
+ };
+
+ add_stream_from_query_cache_source(output, std::move(source));
+ add_stream_from_query_cache_source(totals, std::move(source_totals));
+ add_stream_from_query_cache_source(extremes, std::move(source_extremes));
+}
+
+void QueryPipeline::addStorageHolder(StoragePtr storage)
+{
+ resources.storage_holders.emplace_back(std::move(storage));
+}
+
+void QueryPipeline::addCompletedPipeline(QueryPipeline other)
+{
+ if (!other.completed())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add not completed pipeline");
+
+ resources = std::move(other.resources);
+ processors->insert(processors->end(), other.processors->begin(), other.processors->end());
+}
+
+void QueryPipeline::reset()
+{
+ QueryPipeline to_remove = std::move(*this);
+ *this = QueryPipeline();
+}
+
+static void addExpression(OutputPort *& port, ExpressionActionsPtr actions, Processors & processors)
+{
+ if (port)
+ {
+ auto transform = std::make_shared<ExpressionTransform>(port->getHeader(), actions);
+ connect(*port, transform->getInputPort());
+ port = &transform->getOutputPort();
+ processors.emplace_back(std::move(transform));
+ }
+}
+
+void QueryPipeline::convertStructureTo(const ColumnsWithTypeAndName & columns)
+{
+ if (!pulling())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to convert header");
+
+ auto converting = ActionsDAG::makeConvertingActions(
+ output->getHeader().getColumnsWithTypeAndName(),
+ columns,
+ ActionsDAG::MatchColumnsMode::Position);
+
+ auto actions = std::make_shared<ExpressionActions>(std::move(converting));
+ addExpression(output, actions, *processors);
+ addExpression(totals, actions, *processors);
+ addExpression(extremes, actions, *processors);
+}
+
+std::unique_ptr<ReadProgressCallback> QueryPipeline::getReadProgressCallback() const
+{
+ auto callback = std::make_unique<ReadProgressCallback>();
+
+ callback->setProgressCallback(progress_callback);
+ callback->setQuota(quota);
+ callback->setProcessListElement(process_list_element);
+
+ if (!update_profile_events)
+ callback->disableProfileEventUpdate();
+
+ return callback;
+}
+
+}