aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Disks/getOrCreateDiskFromAST.cpp
blob: da318303f62484314bad20347ce793ea03f974ed (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <Disks/getOrCreateDiskFromAST.h>
#include <Common/logger_useful.h>
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <Disks/getDiskConfigurationFromAST.h>
#include <Disks/DiskSelector.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/isDiskFunction.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int BAD_ARGUMENTS;
}

namespace
{
    std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context)
    {
        const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
        const auto & function_args = function_args_expr->children;
        auto config = getDiskConfigurationFromAST(function_args, context);

        std::string disk_name;
        if (config->has("name"))
        {
            disk_name = config->getString("name");
        }
        else
        {
            /// We need a unique name for a created custom disk, but it needs to be the same
            /// after table is reattached or server is restarted, so take a hash of the disk
            /// configuration serialized ast as a disk name suffix.
            auto disk_setting_string = serializeAST(function);
            disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX
                + toString(sipHash128(disk_setting_string.data(), disk_setting_string.size()));
        }

        auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
            auto disk = DiskFactory::instance().create(disk_name, *config, "", context, disks_map);
            /// Mark that disk can be used without storage policy.
            disk->markDiskAsCustom();
            return disk;
        });

        if (!result_disk->isCustomDisk())
            throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk with name `{}` already exist", disk_name);

        if (!result_disk->isRemote())
        {
            static constexpr auto custom_disks_base_dir_in_config = "custom_local_disks_base_directory";
            auto disk_path_expected_prefix = context->getConfigRef().getString(custom_disks_base_dir_in_config, "");

            if (disk_path_expected_prefix.empty())
                throw Exception(
                    ErrorCodes::BAD_ARGUMENTS,
                    "Base path for custom local disks must be defined in config file by `{}`",
                    custom_disks_base_dir_in_config);

            if (!pathStartsWith(result_disk->getPath(), disk_path_expected_prefix))
                throw Exception(
                    ErrorCodes::BAD_ARGUMENTS,
                    "Path of the custom local disk must be inside `{}` directory",
                    disk_path_expected_prefix);
        }

        return disk_name;
    }

    class DiskConfigurationFlattener
    {
    public:
        struct Data
        {
            ContextPtr context;
        };

        static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }

        static void visit(ASTPtr & ast, Data & data)
        {
            if (isDiskFunction(ast))
            {
                auto disk_name = getOrCreateDiskFromDiskAST(*ast->as<ASTFunction>(), data.context);
                ast = std::make_shared<ASTLiteral>(disk_name);
            }
        }
    };

    /// Visits children first.
    using FlattenDiskConfigurationVisitor = InDepthNodeVisitor<DiskConfigurationFlattener, false>;
}


std::string getOrCreateDiskFromDiskAST(const ASTPtr & disk_function, ContextPtr context)
{
    if (!isDiskFunction(disk_function))
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected a disk function");

    auto ast = disk_function->clone();

    FlattenDiskConfigurationVisitor::Data data{context};
    FlattenDiskConfigurationVisitor{data}.visit(ast);

    auto disk_name = assert_cast<const ASTLiteral &>(*ast).value.get<String>();
    LOG_TRACE(&Poco::Logger::get("getOrCreateDiskFromDiskAST"), "Result disk name: {}", disk_name);
    return disk_name;
}

}