1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
#include "clickhouse_config.h"
#if USE_HDFS
#include <Databases/DatabaseHDFS.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/IStorage.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Poco/URI.h>
#include <re2/re2.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
extern const int FILE_DOESNT_EXIST;
extern const int UNACCEPTABLE_URL;
extern const int ACCESS_DENIED;
extern const int DATABASE_ACCESS_DENIED;
extern const int HDFS_ERROR;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
static constexpr std::string_view HDFS_HOST_REGEXP = "^hdfs://[^/]*";
DatabaseHDFS::DatabaseHDFS(const String & name_, const String & source_url, ContextPtr context_)
: IDatabase(name_)
, WithContext(context_->getGlobalContext())
, source(source_url)
, log(&Poco::Logger::get("DatabaseHDFS(" + name_ + ")"))
{
if (!source.empty())
{
if (!re2::RE2::FullMatch(source, std::string(HDFS_HOST_REGEXP)))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs host: {}. "
"It should have structure 'hdfs://<host_name>:<port>'", source);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI(source));
}
}
void DatabaseHDFS::addTable(const std::string & table_name, StoragePtr table_storage) const
{
std::lock_guard lock(mutex);
auto [_, inserted] = loaded_tables.emplace(table_name, table_storage);
if (!inserted)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Table with name `{}` already exists in database `{}` (engine {})",
table_name, getDatabaseName(), getEngineName());
}
std::string DatabaseHDFS::getTablePath(const std::string & table_name) const
{
if (table_name.starts_with("hdfs://"))
return table_name;
if (source.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs url: {}. "
"It should have structure 'hdfs://<host_name>:<port>/path'", table_name);
return fs::path(source) / table_name;
}
bool DatabaseHDFS::checkUrl(const std::string & url, ContextPtr context_, bool throw_on_error) const
{
try
{
checkHDFSURL(url);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI(url));
}
catch (...)
{
if (throw_on_error)
throw;
return false;
}
return true;
}
bool DatabaseHDFS::isTableExist(const String & name, ContextPtr context_) const
{
std::lock_guard lock(mutex);
if (loaded_tables.find(name) != loaded_tables.end())
return true;
return checkUrl(name, context_, false);
}
StoragePtr DatabaseHDFS::getTableImpl(const String & name, ContextPtr context_) const
{
/// Check if the table exists in the loaded tables map.
{
std::lock_guard lock(mutex);
auto it = loaded_tables.find(name);
if (it != loaded_tables.end())
return it->second;
}
auto url = getTablePath(name);
checkUrl(url, context_, true);
auto args = makeASTFunction("hdfs", std::make_shared<ASTLiteral>(url));
auto table_function = TableFunctionFactory::instance().get(args, context_);
if (!table_function)
return nullptr;
/// TableFunctionHDFS throws exceptions, if table cannot be created.
auto table_storage = table_function->execute(args, context_, name);
if (table_storage)
addTable(name, table_storage);
return table_storage;
}
StoragePtr DatabaseHDFS::getTable(const String & name, ContextPtr context_) const
{
/// Rethrow all exceptions from TableFunctionHDFS to show correct error to user.
if (auto storage = getTableImpl(name, context_))
return storage;
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist",
backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
}
StoragePtr DatabaseHDFS::tryGetTable(const String & name, ContextPtr context_) const
{
try
{
return getTableImpl(name, context_);
}
catch (const Exception & e)
{
// Ignore exceptions thrown by TableFunctionHDFS, which indicate that there is no table
if (e.code() == ErrorCodes::BAD_ARGUMENTS
|| e.code() == ErrorCodes::ACCESS_DENIED
|| e.code() == ErrorCodes::DATABASE_ACCESS_DENIED
|| e.code() == ErrorCodes::FILE_DOESNT_EXIST
|| e.code() == ErrorCodes::UNACCEPTABLE_URL
|| e.code() == ErrorCodes::HDFS_ERROR
|| e.code() == ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE)
{
return nullptr;
}
throw;
}
catch (const Poco::URISyntaxException &)
{
return nullptr;
}
}
bool DatabaseHDFS::empty() const
{
std::lock_guard lock(mutex);
return loaded_tables.empty();
}
ASTPtr DatabaseHDFS::getCreateDatabaseQuery() const
{
const auto & settings = getContext()->getSettingsRef();
ParserCreateQuery parser;
const String query = fmt::format("CREATE DATABASE {} ENGINE = HDFS('{}')", backQuoteIfNeed(getDatabaseName()), source);
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
{
auto & ast_create_query = ast->as<ASTCreateQuery &>();
ast_create_query.set(ast_create_query.comment, std::make_shared<ASTLiteral>(database_comment));
}
return ast;
}
void DatabaseHDFS::shutdown()
{
Tables tables_snapshot;
{
std::lock_guard lock(mutex);
tables_snapshot = loaded_tables;
}
for (const auto & kv : tables_snapshot)
{
auto table_id = kv.second->getStorageID();
kv.second->flushAndShutdown();
}
std::lock_guard lock(mutex);
loaded_tables.clear();
}
/**
* Returns an empty vector because the database is read-only and no tables can be backed up
*/
std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseHDFS::getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const
{
return {};
}
/**
*
* Returns an empty iterator because the database does not have its own tables
* But only caches them for quick access
*/
DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const FilterByNameFunction &) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}
} // DB
#endif
|