1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
|
#pragma once
#include <memory>
#include <optional>
#include <Access/EnabledRowPolicies.h>
#include <Core/QueryProcessingStage.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/StorageID.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableLockHolder.h>
#include <QueryPipeline/Pipe.h>
#include <Columns/FilterDescription.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class SubqueryForSet;
class InterpreterSelectWithUnionQuery;
class Context;
class QueryPlan;
struct GroupingSetsParams;
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
struct TreeRewriterResult;
using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
struct RowPolicy;
using RowPolicyPtr = std::shared_ptr<const RowPolicy>;
/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
*/
class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery
{
public:
/**
* query_ptr
* - A query AST to interpret.
*
* required_result_column_names
* - don't calculate all columns except the specified ones from the query
* - it is used to remove calculation (and reading) of unnecessary columns from subqueries.
* empty means - use all columns.
*/
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
const SelectQueryOptions &,
const Names & required_result_column_names_ = Names{});
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextMutablePtr & context_,
const SelectQueryOptions &,
const Names & required_result_column_names_ = Names{});
/// Read data not from the table specified in the query, but from the prepared pipe `input`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
Pipe input_pipe_,
const SelectQueryOptions & = {});
/// Read data not from the table specified in the query, but from the specified `storage_`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
const StoragePtr & storage_,
const StorageMetadataPtr & metadata_snapshot_ = nullptr,
const SelectQueryOptions & = {});
/// Reuse existing prepared_sets for another pass of analysis. It's used for projection.
/// TODO: Find a general way of sharing sets among different interpreters, such as subqueries.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
const SelectQueryOptions &,
PreparedSetsPtr prepared_sets_);
~InterpreterSelectQuery() override;
/// Execute a query. Get the stream of blocks to read.
BlockIO execute() override;
/// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan) override;
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
void ignoreWithTotals() override;
ASTPtr getQuery() const { return query_ptr; }
const SelectQueryInfo & getQueryInfo() const { return query_info; }
SelectQueryExpressionAnalyzer * getQueryAnalyzer() const { return query_analyzer.get(); }
const ExpressionAnalysisResult & getAnalysisResult() const { return analysis_result; }
const Names & getRequiredColumns() const { return required_columns; }
bool hasAggregation() const { return query_analyzer->hasAggregation(); }
static void addEmptySourceToQueryPlan(
QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info, const ContextPtr & context_);
Names getRequiredColumns() { return required_columns; }
bool supportsTransactions() const override { return true; }
FilterDAGInfoPtr getAdditionalQueryInfo() const { return additional_filter_info; }
RowPolicyFilterPtr getRowPolicyFilter() const;
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context) const override;
static SortDescription getSortDescription(const ASTSelectQuery & query, const ContextPtr & context);
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const ContextPtr & context);
static bool isQueryWithFinal(const SelectQueryInfo & info);
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
std::optional<Pipe> input_pipe,
const StoragePtr & storage_,
const SelectQueryOptions &,
const Names & required_result_column_names = {},
const StorageMetadataPtr & metadata_snapshot_ = nullptr,
PreparedSetsPtr prepared_sets_ = nullptr);
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextMutablePtr & context_,
std::optional<Pipe> input_pipe,
const StoragePtr & storage_,
const SelectQueryOptions &,
const Names & required_result_column_names = {},
const StorageMetadataPtr & metadata_snapshot_ = nullptr,
PreparedSetsPtr prepared_sets_ = nullptr);
ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
void addPrewhereAliasActions();
bool shouldMoveToPrewhere();
Block getSampleBlockImpl();
void executeImpl(QueryPlan & query_plan, std::optional<Pipe> prepared_pipe);
/// Different stages of query execution.
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan);
void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
void executeAggregation(
QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final, bool has_grouping_sets);
void executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ActionsDAGPtr & expression, bool remove_filter, bool overflow_row, bool final);
void executeHaving(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
static void executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description);
/// FIXME should go through ActionsDAG to behave as a proper function
void executeWindow(QueryPlan & query_plan);
void executeOrder(QueryPlan & query_plan, InputOrderInfoPtr sorting_info);
void executeOrderOptimized(QueryPlan & query_plan, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeWithFill(QueryPlan & query_plan);
void executeMergeSorted(QueryPlan & query_plan, const std::string & description);
void executePreLimit(QueryPlan & query_plan, bool do_not_skip_offset);
void executeLimitBy(QueryPlan & query_plan);
void executeLimit(QueryPlan & query_plan);
void executeOffset(QueryPlan & query_plan);
static void executeProjection(QueryPlan & query_plan, const ActionsDAGPtr & expression);
void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct);
void executeExtremes(QueryPlan & query_plan);
void executeSubqueriesInSetsAndJoins(QueryPlan & query_plan);
bool autoFinalOnQuery(ASTSelectQuery & select_query);
std::optional<UInt64> getTrivialCount(UInt64 max_parallel_replicas);
enum class Modificator
{
ROLLUP = 0,
CUBE = 1,
};
void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
*
* Section SETTINGS - settings for a specific query.
* Normally, the settings can be passed in other ways, not inside the query.
* But the use of this section is justified if you need to set the settings for one subquery.
*/
void initSettings();
TreeRewriterResultPtr syntax_analyzer_result;
std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer;
SelectQueryInfo query_info;
/// Is calculated in getSampleBlock. Is used later in readImpl.
ExpressionAnalysisResult analysis_result;
/// For row-level security.
RowPolicyFilterPtr row_policy_filter;
FilterDAGInfoPtr filter_info;
/// For additional_filter setting.
FilterDAGInfoPtr additional_filter_info;
/// For "per replica" filter when multiple replicas are used
FilterDAGInfoPtr parallel_replicas_custom_filter_info;
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// List of columns to read to execute the query.
Names required_columns;
/// Structure of query source (table, subquery, etc).
Block source_header;
/// Actions to calculate ALIAS if required.
ActionsDAGPtr alias_actions;
/// The subquery interpreter, if the subquery
std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery;
/// Table from where to read data, if not subquery.
StoragePtr storage;
StorageID table_id = StorageID::createEmpty(); /// Will be initialized if storage is not nullptr
TableLockHolder table_lock;
/// Used when we read from prepared input, not table or subquery.
std::optional<Pipe> input_pipe;
Poco::Logger * log;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
/// Reuse already built sets for multiple passes of analysis, possibly across interpreters.
PreparedSetsPtr prepared_sets;
};
}
|