1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
#pragma once
#include <Storages/IStorage.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/AlterConversions.h>
#include <DataTypes/ObjectUtils.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Core/Defines.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// A Storage that allows reading from a single MergeTree data part.
class StorageFromMergeTreeDataPart final : public IStorage
{
public:
/// Used in part mutation.
explicit StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(getIDFromPart(part_))
, parts({part_})
, alter_conversions({part_->storage.getAlterConversionsForPart(part_)})
, storage(part_->storage)
, partition_id(part_->info.partition_id)
{
setInMemoryMetadata(storage.getInMemoryMetadata());
}
/// Used in queries with projection.
StorageFromMergeTreeDataPart(const MergeTreeData & storage_, MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr_)
: IStorage(storage_.getStorageID()), storage(storage_), analysis_result_ptr(analysis_result_ptr_)
{
setInMemoryMetadata(storage.getInMemoryMetadata());
}
String getName() const override { return "FromMergeTreeDataPart"; }
StorageSnapshotPtr getStorageSnapshot(
const StorageMetadataPtr & metadata_snapshot, ContextPtr /*query_context*/) const override
{
const auto & storage_columns = metadata_snapshot->getColumns();
if (!hasDynamicSubcolumns(storage_columns))
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot);
auto object_columns = getConcreteObjectColumns(
parts.begin(), parts.end(),
storage_columns, [](const auto & part) -> const auto & { return part->getColumns(); });
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, std::move(object_columns));
}
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
size_t num_streams) override
{
query_plan.addStep(MergeTreeDataSelectExecutor(storage)
.readFromParts(
parts,
alter_conversions,
column_names,
storage_snapshot,
query_info,
context,
max_block_size,
num_streams,
nullptr,
analysis_result_ptr));
}
bool supportsPrewhere() const override { return true; }
bool supportsIndexForIn() const override { return true; }
bool supportsDynamicSubcolumns() const override { return true; }
bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override
{
return storage.mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
NamesAndTypesList getVirtuals() const override
{
return storage.getVirtuals();
}
String getPartitionId() const
{
return partition_id;
}
String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context) const
{
return storage.getPartitionIDFromQuery(ast, context);
}
bool materializeTTLRecalculateOnly() const
{
if (parts.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "parts must not be empty for materializeTTLRecalculateOnly");
return parts.front()->storage.getSettings()->materialize_ttl_recalculate_only;
}
bool hasLightweightDeletedMask() const override
{
return !parts.empty() && parts.front()->hasLightweightDelete();
}
bool supportsLightweightDelete() const override
{
return !parts.empty() && parts.front()->supportLightweightDeleteMutate();
}
private:
const MergeTreeData::DataPartsVector parts;
const std::vector<AlterConversionsPtr> alter_conversions;
const MergeTreeData & storage;
const String partition_id;
const MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr;
static StorageID getIDFromPart(const MergeTreeData::DataPartPtr & part_)
{
auto table_id = part_->storage.getStorageID();
return StorageID(table_id.database_name, table_id.table_name + " (part " + part_->name + ")");
}
};
}
|