1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <Disks/SingleDiskVolume.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
DataPartStorageOnDiskFull::DataPartStorageOnDiskFull(VolumePtr volume_, std::string root_path_, std::string part_dir_)
: DataPartStorageOnDiskBase(std::move(volume_), std::move(root_path_), std::move(part_dir_))
{
}
DataPartStorageOnDiskFull::DataPartStorageOnDiskFull(
VolumePtr volume_, std::string root_path_, std::string part_dir_, DiskTransactionPtr transaction_)
: DataPartStorageOnDiskBase(std::move(volume_), std::move(root_path_), std::move(part_dir_), std::move(transaction_))
{
}
MutableDataPartStoragePtr DataPartStorageOnDiskFull::create(
VolumePtr volume_, std::string root_path_, std::string part_dir_, bool /*initialize_*/) const
{
return std::make_shared<DataPartStorageOnDiskFull>(std::move(volume_), std::move(root_path_), std::move(part_dir_));
}
MutableDataPartStoragePtr DataPartStorageOnDiskFull::getProjection(const std::string & name, bool use_parent_transaction) // NOLINT
{
return std::shared_ptr<DataPartStorageOnDiskFull>(new DataPartStorageOnDiskFull(volume, std::string(fs::path(root_path) / part_dir), name, use_parent_transaction ? transaction : nullptr));
}
DataPartStoragePtr DataPartStorageOnDiskFull::getProjection(const std::string & name) const
{
return std::make_shared<DataPartStorageOnDiskFull>(volume, std::string(fs::path(root_path) / part_dir), name);
}
bool DataPartStorageOnDiskFull::exists() const
{
return volume->getDisk()->exists(fs::path(root_path) / part_dir);
}
bool DataPartStorageOnDiskFull::exists(const std::string & name) const
{
return volume->getDisk()->exists(fs::path(root_path) / part_dir / name);
}
bool DataPartStorageOnDiskFull::isDirectory(const std::string & name) const
{
return volume->getDisk()->isDirectory(fs::path(root_path) / part_dir / name);
}
class DataPartStorageIteratorOnDisk final : public IDataPartStorageIterator
{
public:
DataPartStorageIteratorOnDisk(DiskPtr disk_, DirectoryIteratorPtr it_)
: disk(std::move(disk_)), it(std::move(it_))
{
}
void next() override { it->next(); }
bool isValid() const override { return it->isValid(); }
bool isFile() const override { return isValid() && disk->isFile(it->path()); }
std::string name() const override { return it->name(); }
std::string path() const override { return it->path(); }
private:
DiskPtr disk;
DirectoryIteratorPtr it;
};
DataPartStorageIteratorPtr DataPartStorageOnDiskFull::iterate() const
{
return std::make_unique<DataPartStorageIteratorOnDisk>(
volume->getDisk(),
volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir));
}
Poco::Timestamp DataPartStorageOnDiskFull::getFileLastModified(const String & file_name) const
{
return volume->getDisk()->getLastModified(fs::path(root_path) / part_dir / file_name);
}
size_t DataPartStorageOnDiskFull::getFileSize(const String & file_name) const
{
return volume->getDisk()->getFileSize(fs::path(root_path) / part_dir / file_name);
}
UInt32 DataPartStorageOnDiskFull::getRefCount(const String & file_name) const
{
return volume->getDisk()->getRefCount(fs::path(root_path) / part_dir / file_name);
}
std::string DataPartStorageOnDiskFull::getRemotePath(const std::string & file_name) const
{
auto objects = volume->getDisk()->getStorageObjects(fs::path(root_path) / part_dir / file_name);
if (objects.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "One file must be mapped to one object on blob storage in MergeTree tables");
return objects[0].remote_path;
}
String DataPartStorageOnDiskFull::getUniqueId() const
{
auto disk = volume->getDisk();
if (!disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk {} doesn't support zero-copy replication", disk->getName());
return disk->getUniqueId(fs::path(getRelativePath()) / "checksums.txt");
}
std::unique_ptr<ReadBufferFromFileBase> DataPartStorageOnDiskFull::readFile(
const std::string & name,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
return volume->getDisk()->readFile(fs::path(root_path) / part_dir / name, settings, read_hint, file_size);
}
std::unique_ptr<WriteBufferFromFileBase> DataPartStorageOnDiskFull::writeFile(
const String & name,
size_t buf_size,
WriteMode mode,
const WriteSettings & settings)
{
if (transaction)
return transaction->writeFile(fs::path(root_path) / part_dir / name, buf_size, mode, settings, /* autocommit = */ false);
else
return volume->getDisk()->writeFile(fs::path(root_path) / part_dir / name, buf_size, mode, settings);
}
void DataPartStorageOnDiskFull::createFile(const String & name)
{
executeWriteOperation([&](auto & disk) { disk.createFile(fs::path(root_path) / part_dir / name); });
}
void DataPartStorageOnDiskFull::moveFile(const String & from_name, const String & to_name)
{
executeWriteOperation([&](auto & disk)
{
auto relative_path = fs::path(root_path) / part_dir;
disk.moveFile(relative_path / from_name, relative_path / to_name);
});
}
void DataPartStorageOnDiskFull::replaceFile(const String & from_name, const String & to_name)
{
executeWriteOperation([&](auto & disk)
{
auto relative_path = fs::path(root_path) / part_dir;
disk.replaceFile(relative_path / from_name, relative_path / to_name);
});
}
void DataPartStorageOnDiskFull::removeFile(const String & name)
{
executeWriteOperation([&](auto & disk) { disk.removeFile(fs::path(root_path) / part_dir / name); });
}
void DataPartStorageOnDiskFull::removeFileIfExists(const String & name)
{
executeWriteOperation([&](auto & disk) { disk.removeFileIfExists(fs::path(root_path) / part_dir / name); });
}
void DataPartStorageOnDiskFull::createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to)
{
const auto * source_on_disk = typeid_cast<const DataPartStorageOnDiskFull *>(&source);
if (!source_on_disk)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create hardlink from different storage. Expected DataPartStorageOnDiskFull, got {}",
typeid(source).name());
executeWriteOperation([&](auto & disk)
{
disk.createHardLink(
fs::path(source_on_disk->getRelativePath()) / from,
fs::path(root_path) / part_dir / to);
});
}
void DataPartStorageOnDiskFull::createProjection(const std::string & name)
{
executeWriteOperation([&](auto & disk) { disk.createDirectory(fs::path(root_path) / part_dir / name); });
}
void DataPartStorageOnDiskFull::beginTransaction()
{
if (transaction)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Uncommitted{}transaction already exists", has_shared_transaction ? " shared " : " ");
transaction = volume->getDisk()->createTransaction();
}
void DataPartStorageOnDiskFull::commitTransaction()
{
if (!transaction)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no uncommitted transaction");
if (has_shared_transaction)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot commit shared transaction");
transaction->commit();
transaction.reset();
}
}
|