1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
#include <Storages/System/StorageSystemDataSkippingIndices.h>
#include <Access/ContextAccess.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/IDatabase.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/queryToString.h>
#include <Processors/ISource.h>
#include <QueryPipeline/Pipe.h>
namespace DB
{
StorageSystemDataSkippingIndices::StorageSystemDataSkippingIndices(const StorageID & table_id_)
: IStorage(table_id_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(ColumnsDescription(
{
{ "database", std::make_shared<DataTypeString>() },
{ "table", std::make_shared<DataTypeString>() },
{ "name", std::make_shared<DataTypeString>() },
{ "type", std::make_shared<DataTypeString>() },
{ "type_full", std::make_shared<DataTypeString>() },
{ "expr", std::make_shared<DataTypeString>() },
{ "granularity", std::make_shared<DataTypeUInt64>() },
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "marks", std::make_shared<DataTypeUInt64>()}
}));
setInMemoryMetadata(storage_metadata);
}
class DataSkippingIndicesSource : public ISource
{
public:
DataSkippingIndicesSource(
std::vector<UInt8> columns_mask_,
Block header,
UInt64 max_block_size_,
ColumnPtr databases_,
ContextPtr context_)
: ISource(header)
, column_mask(std::move(columns_mask_))
, max_block_size(max_block_size_)
, databases(std::move(databases_))
, context(Context::createCopy(context_))
, database_idx(0)
{}
String getName() const override { return "DataSkippingIndices"; }
protected:
Chunk generate() override
{
if (database_idx >= databases->size())
return {};
MutableColumns res_columns = getPort().getHeader().cloneEmptyColumns();
const auto access = context->getAccess();
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
size_t rows_count = 0;
while (rows_count < max_block_size)
{
if (tables_it && !tables_it->isValid())
++database_idx;
while (database_idx < databases->size() && (!tables_it || !tables_it->isValid()))
{
database_name = databases->getDataAt(database_idx).toString();
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
if (database)
break;
++database_idx;
}
if (database_idx >= databases->size())
break;
if (!tables_it || !tables_it->isValid())
tables_it = database->getTablesIterator(context);
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);
for (; rows_count < max_block_size && tables_it->isValid(); tables_it->next())
{
auto table_name = tables_it->name();
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, database_name, table_name))
continue;
const auto table = tables_it->table();
if (!table)
continue;
StorageMetadataPtr metadata_snapshot = table->getInMemoryMetadataPtr();
if (!metadata_snapshot)
continue;
const auto indices = metadata_snapshot->getSecondaryIndices();
auto secondary_index_sizes = table->getSecondaryIndexSizes();
for (const auto & index : indices)
{
++rows_count;
size_t src_index = 0;
size_t res_index = 0;
// 'database' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(database_name);
// 'table' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(table_name);
// 'name' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(index.name);
// 'type' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(index.type);
// 'type_full' column
if (column_mask[src_index++])
{
if (auto * expression = index.definition_ast->as<ASTIndexDeclaration>(); expression && expression->type)
res_columns[res_index++]->insert(queryToString(*expression->type));
else
res_columns[res_index++]->insertDefault();
}
// 'expr' column
if (column_mask[src_index++])
{
if (auto expression = index.expression_list_ast)
res_columns[res_index++]->insert(queryToString(expression));
else
res_columns[res_index++]->insertDefault();
}
// 'granularity' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(index.granularity);
auto & secondary_index_size = secondary_index_sizes[index.name];
// 'compressed bytes' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(secondary_index_size.data_compressed);
// 'uncompressed bytes' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(secondary_index_size.data_uncompressed);
/// 'marks' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(secondary_index_size.marks);
}
}
}
return Chunk(std::move(res_columns), rows_count);
}
private:
std::vector<UInt8> column_mask;
UInt64 max_block_size;
ColumnPtr databases;
ContextPtr context;
size_t database_idx;
DatabasePtr database;
std::string database_name;
DatabaseTablesIteratorPtr tables_it;
};
Pipe StorageSystemDataSkippingIndices::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /* processed_stage */,
size_t max_block_size,
size_t /* num_streams */)
{
storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlock();
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
MutableColumnPtr column = ColumnString::create();
const auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & [database_name, database] : databases)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue;
/// Lazy database can contain only very primitive tables,
/// it cannot contain tables with data skipping indices.
/// Skip it to avoid unnecessary tables loading in the Lazy database.
if (database->getEngineName() != "Lazy")
column->insert(database_name);
}
/// Condition on "database" in a query acts like an index.
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block, context);
ColumnPtr & filtered_databases = block.getByPosition(0).column;
return Pipe(std::make_shared<DataSkippingIndicesSource>(
std::move(columns_mask), std::move(header), max_block_size, std::move(filtered_databases), context));
}
}
|