1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
#include "Storages/MergeTree/FutureMergedMutatedPart.h"
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
{
if (parts_.empty())
return;
size_t sum_rows = 0;
size_t sum_bytes_uncompressed = 0;
MergeTreeDataPartType future_part_type;
MergeTreeDataPartStorageType future_part_storage_type;
for (const auto & part : parts_)
{
sum_rows += part->rows_count;
sum_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed;
future_part_type = std::min(future_part_type, part->getType());
future_part_storage_type = std::min(future_part_storage_type, part->getDataPartStorage().getType());
}
auto chosen_format = parts_.front()->storage.choosePartFormatOnDisk(sum_bytes_uncompressed, sum_rows);
future_part_type = std::min(future_part_type, chosen_format.part_type);
future_part_storage_type = std::min(future_part_storage_type, chosen_format.storage_type);
assign(std::move(parts_), {future_part_type, future_part_storage_type});
}
void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_, MergeTreeDataPartFormat future_part_format)
{
if (parts_.empty())
return;
for (const MergeTreeData::DataPartPtr & part : parts_)
{
const MergeTreeData::DataPartPtr & first_part = parts_.front();
if (part->partition.value != first_part->partition.value)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempting to merge parts {} and {} that are in different partitions",
first_part->name, part->name);
}
parts = std::move(parts_);
UInt32 max_level = 0;
Int64 max_mutation = 0;
for (const auto & part : parts)
{
max_level = std::max(max_level, part->info.level);
max_mutation = std::max(max_mutation, part->info.mutation);
}
part_format = future_part_format;
part_info.partition_id = parts.front()->info.partition_id;
part_info.min_block = parts.front()->info.min_block;
part_info.max_block = parts.back()->info.max_block;
part_info.level = max_level + 1;
part_info.mutation = max_mutation;
if (parts.front()->storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
DayNum min_date = DayNum(std::numeric_limits<UInt16>::max());
DayNum max_date = DayNum(std::numeric_limits<UInt16>::min());
for (const auto & part : parts)
{
/// NOTE: getting min and max dates from part names (instead of part data) because we want
/// the merged part name be determined only by source part names.
/// It is simpler this way when the real min and max dates for the block range can change
/// (e.g. after an ALTER DELETE command).
DayNum part_min_date;
DayNum part_max_date;
MergeTreePartInfo::parseMinMaxDatesFromPartName(part->name, part_min_date, part_max_date);
min_date = std::min(min_date, part_min_date);
max_date = std::max(max_date, part_max_date);
}
name = part_info.getPartNameV0(min_date, max_date);
}
else
name = part_info.getPartNameV1();
}
void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const IReservation * reservation)
{
path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/";
}
}
|