1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
#include <Storages/ReadFinalForExternalReplicaStorage.h>
#if USE_MYSQL || USE_LIBPQXX
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
namespace DB
{
bool needRewriteQueryWithFinalForStorage(const Names & column_names, const StoragePtr & storage)
{
const StorageMetadataPtr & metadata = storage->getInMemoryMetadataPtr();
Block header = metadata->getSampleBlock();
ColumnWithTypeAndName & version_column = header.getByPosition(header.columns() - 1);
return std::find(column_names.begin(), column_names.end(), version_column.name) == column_names.end();
}
void readFinalFromNestedStorage(
QueryPlan & query_plan,
StoragePtr nested_storage,
const Names & column_names,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
const auto & nested_metadata = nested_storage->getInMemoryMetadataPtr();
Block nested_header = nested_metadata->getSampleBlock();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
String filter_column_name;
Names require_columns_name = column_names;
ASTPtr expressions = std::make_shared<ASTExpressionList>();
if (column_names_set.empty() || !column_names_set.contains(sign_column.name))
{
require_columns_name.emplace_back(sign_column.name);
const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(static_cast<Int8>(1)));
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
filter_column_name = expressions->children.back()->getColumnName();
}
auto nested_snapshot = nested_storage->getStorageSnapshot(nested_metadata, context);
nested_storage->read(
query_plan, require_columns_name, nested_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
if (!query_plan.isInitialized())
{
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, nested_header, query_info, context);
return;
}
query_plan.addTableLock(lock);
query_plan.addStorageHolder(nested_storage);
if (!expressions->children.empty())
{
const auto & header = query_plan.getCurrentDataStream().header;
auto syntax = TreeRewriter(context).analyze(expressions, header.getNamesAndTypesList());
auto actions = ExpressionAnalyzer(expressions, syntax, context).getActionsDAG(true /* add_aliases */, false /* project_result */);
auto step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
actions,
filter_column_name,
false);
step->setStepDescription("Filter columns");
query_plan.addStep(std::move(step));
}
}
}
#endif
|