1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
#pragma once
#include "clickhouse_config.h"
#if USE_HDFS
#include <memory>
#include <type_traits>
#include <vector>
#error #include <hdfs/hdfs.h>
#include <base/types.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace detail
{
struct HDFSFsDeleter
{
void operator()(hdfsFS fs_ptr)
{
hdfsDisconnect(fs_ptr);
}
};
}
struct HDFSFileInfo
{
hdfsFileInfo * file_info;
int length;
HDFSFileInfo() : file_info(nullptr) , length(0) {}
HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
HDFSFileInfo & operator=(HDFSFileInfo && other) = default;
~HDFSFileInfo();
};
class HDFSBuilderWrapper
{
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
static const String CONFIG_PREFIX;
public:
HDFSBuilderWrapper() : hdfs_builder(hdfsNewBuilder()) {}
~HDFSBuilderWrapper() { hdfsFreeBuilder(hdfs_builder); }
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper & operator=(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper && other) noexcept
{
*this = std::move(other);
}
HDFSBuilderWrapper & operator=(HDFSBuilderWrapper && other) noexcept
{
std::swap(hdfs_builder, other.hdfs_builder);
config_stor = std::move(other.config_stor);
hadoop_kerberos_keytab = std::move(other.hadoop_kerberos_keytab);
hadoop_kerberos_principal = std::move(other.hadoop_kerberos_principal);
hadoop_security_kerberos_ticket_cache_path = std::move(other.hadoop_security_kerberos_ticket_cache_path);
need_kinit = std::move(other.need_kinit);
return *this;
}
hdfsBuilder * get() { return hdfs_builder; }
private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false);
// hdfs builder relies on an external config data storage
std::pair<String, String>& keep(const String & k, const String & v)
{
return config_stor.emplace_back(std::make_pair(k, v));
}
hdfsBuilder * hdfs_builder;
std::vector<std::pair<String, String>> config_stor;
#if USE_KRB5
void runKinit();
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_security_kerberos_ticket_cache_path;
bool need_kinit{false};
#endif // USE_KRB5
};
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
String getNameNodeUrl(const String & hdfs_url);
String getNameNodeCluster(const String & hdfs_url);
/// Check that url satisfy structure 'hdfs://<host_name>:<port>/<path>'
/// and throw exception if it doesn't;
void checkHDFSURL(const String & url);
}
#endif
|