aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Core/ExternalTable.cpp
blob: 58b705ca3175b17a9f532babec40ed8dc2112681 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#include <boost/program_options.hpp>
#include <DataTypes/DataTypeFactory.h>
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h>

#include <QueryPipeline/Pipe.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
#include <base/scope_guard.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int BAD_ARGUMENTS;
}


ExternalTableDataPtr BaseExternalTable::getData(ContextPtr context)
{
    initReadBuffer();
    initSampleBlock();
    auto input = context->getInputFormat(format, *read_buffer, sample_block, context->getSettingsRef().get("max_block_size").get<UInt64>());

    auto data = std::make_unique<ExternalTableData>();
    data->pipe = std::make_unique<QueryPipelineBuilder>();
    data->pipe->init(Pipe(std::move(input)));
    data->table_name = name;

    return data;
}

void BaseExternalTable::clear()
{
    name.clear();
    file.clear();
    format.clear();
    structure.clear();
    sample_block.clear();
    read_buffer.reset();
}

void BaseExternalTable::parseStructureFromStructureField(const std::string & argument)
{
    std::vector<std::string> vals;
    splitInto<' ', ','>(vals, argument, true);

    if (vals.size() % 2 != 0)
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Odd number of attributes in section structure: {}", vals.size());

    for (size_t i = 0; i < vals.size(); i += 2)
        structure.emplace_back(vals[i], vals[i + 1]);
}

void BaseExternalTable::parseStructureFromTypesField(const std::string & argument)
{
    std::vector<std::string> vals;
    splitInto<' ', ','>(vals, argument, true);

    for (size_t i = 0; i < vals.size(); ++i)
        structure.emplace_back("_" + toString(i + 1), vals[i]);
}

void BaseExternalTable::initSampleBlock()
{
    const DataTypeFactory & data_type_factory = DataTypeFactory::instance();

    for (const auto & elem : structure)
    {
        ColumnWithTypeAndName column;
        column.name = elem.first;
        column.type = data_type_factory.get(elem.second);
        column.column = column.type->createColumn();
        sample_block.insert(std::move(column));
    }
}


void ExternalTable::initReadBuffer()
{
    if (file == "-")
        read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
    else
        read_buffer = std::make_unique<ReadBufferFromFile>(file);
}

ExternalTable::ExternalTable(const boost::program_options::variables_map & external_options)
{
    if (external_options.count("file"))
        file = external_options["file"].as<std::string>();
    else
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "--file field have not been provided for external table");

    if (external_options.count("name"))
        name = external_options["name"].as<std::string>();
    else
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "--name field have not been provided for external table");

    if (external_options.count("format"))
        format = external_options["format"].as<std::string>();
    else
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "--format field have not been provided for external table");

    if (external_options.count("structure"))
        parseStructureFromStructureField(external_options["structure"].as<std::string>());
    else if (external_options.count("types"))
        parseStructureFromTypesField(external_options["types"].as<std::string>());
    else
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Neither --structure nor --types have not been provided for external table");
}


void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream)
{
    /// After finishing this function we will be ready to receive the next file, for this we clear all the information received.
    /// We should use SCOPE_EXIT because read_buffer should be reset correctly if there will be an exception.
    SCOPE_EXIT(clear());

    const Settings & settings = getContext()->getSettingsRef();

    if (settings.http_max_multipart_form_data_size)
        read_buffer = std::make_unique<LimitReadBuffer>(
            stream, settings.http_max_multipart_form_data_size,
            /* trow_exception */ true, /* exact_limit */ std::optional<size_t>(),
            "the maximum size of multipart/form-data. "
            "This limit can be tuned by 'http_max_multipart_form_data_size' setting");
    else
        read_buffer = wrapReadBufferReference(stream);

    /// Retrieve a collection of parameters from MessageHeader
    Poco::Net::NameValueCollection content;
    std::string label;
    Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), label, content);

    /// Get parameters
    name = content.get("name", "_data");
    format = params.get(name + "_format", "TabSeparated");

    if (params.has(name + "_structure"))
        parseStructureFromStructureField(params.get(name + "_structure"));
    else if (params.has(name + "_types"))
        parseStructureFromTypesField(params.get(name + "_types"));
    else
        throw Exception(ErrorCodes::BAD_ARGUMENTS,
                        "Neither structure nor types have not been provided for external table {}. "
                        "Use fields {}_structure or {}_types to do so.", name, name, name);

    ExternalTableDataPtr data = getData(getContext());

    /// Create table
    NamesAndTypesList columns = sample_block.getNamesAndTypesList();
    auto temporary_table = TemporaryTableHolder(getContext(), ColumnsDescription{columns}, {});
    auto storage = temporary_table.getTable();
    getContext()->addExternalTable(data->table_name, std::move(temporary_table));
    auto sink = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), getContext(), /*async_insert=*/false);

    /// Write data
    auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*data->pipe));
    pipeline.complete(std::move(sink));
    pipeline.setNumThreads(1);

    CompletedPipelineExecutor executor(pipeline);
    executor.execute();
}

}