aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/HDFS/HDFSCommon.h
blob: 251d2f931312b4cd9dde8034d00fdd6ba99edbff (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#pragma once

#include "clickhouse_config.h"

#if USE_HDFS
#include <memory>
#include <type_traits>
#include <vector>

#error #include <hdfs/hdfs.h>
#include <base/types.h>

#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>


namespace DB
{

namespace detail
{
    struct HDFSFsDeleter
    {
        void operator()(hdfsFS fs_ptr)
        {
            hdfsDisconnect(fs_ptr);
        }
    };
}


struct HDFSFileInfo
{
    hdfsFileInfo * file_info;
    int length;

    HDFSFileInfo() : file_info(nullptr) , length(0) {}

    HDFSFileInfo(const HDFSFileInfo & other) = delete;
    HDFSFileInfo(HDFSFileInfo && other) = default;
    HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
    HDFSFileInfo & operator=(HDFSFileInfo && other) = default;
    ~HDFSFileInfo();
};


class HDFSBuilderWrapper
{

friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);

static const String CONFIG_PREFIX;

public:
    HDFSBuilderWrapper() : hdfs_builder(hdfsNewBuilder()) {}

    ~HDFSBuilderWrapper() { hdfsFreeBuilder(hdfs_builder); }

    HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
    HDFSBuilderWrapper & operator=(const HDFSBuilderWrapper &) = delete;

    HDFSBuilderWrapper(HDFSBuilderWrapper && other) noexcept
    {
        *this = std::move(other);
    }

    HDFSBuilderWrapper & operator=(HDFSBuilderWrapper && other) noexcept
    {
        std::swap(hdfs_builder, other.hdfs_builder);
        config_stor = std::move(other.config_stor);
        hadoop_kerberos_keytab = std::move(other.hadoop_kerberos_keytab);
        hadoop_kerberos_principal = std::move(other.hadoop_kerberos_principal);
        hadoop_security_kerberos_ticket_cache_path = std::move(other.hadoop_security_kerberos_ticket_cache_path);
        need_kinit = std::move(other.need_kinit);
        return *this;
    }

    hdfsBuilder * get() { return hdfs_builder; }

private:
    void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false);

    // hdfs builder relies on an external config data storage
    std::pair<String, String>& keep(const String & k, const String & v)
    {
        return config_stor.emplace_back(std::make_pair(k, v));
    }

    hdfsBuilder * hdfs_builder;
    std::vector<std::pair<String, String>> config_stor;

    #if USE_KRB5
    void runKinit();
    String hadoop_kerberos_keytab;
    String hadoop_kerberos_principal;
    String hadoop_security_kerberos_ticket_cache_path;
    bool need_kinit{false};
    #endif // USE_KRB5
};

using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;


// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);


String getNameNodeUrl(const String & hdfs_url);
String getNameNodeCluster(const String & hdfs_url);

/// Check that url satisfy structure 'hdfs://<host_name>:<port>/<path>'
/// and throw exception if it doesn't;
void checkHDFSURL(const String & url);

}
#endif