aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/BridgeHelper/ExternalDictionaryLibraryBridgeHelper.cpp
blob: fcb8ebd1f22b728ee2d767c8707f2422f646b1db (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#include "ExternalDictionaryLibraryBridgeHelper.h"

#include <Formats/formatBlock.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Formats/IInputFormat.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <Common/logger_useful.h>
#include <base/range.h>
#include <Core/Field.h>
#include <Common/escapeForFileName.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int EXTERNAL_LIBRARY_ERROR;
    extern const int LOGICAL_ERROR;
}

ExternalDictionaryLibraryBridgeHelper::ExternalDictionaryLibraryBridgeHelper(
        ContextPtr context_,
        const Block & sample_block_,
        const Field & dictionary_id_,
        const LibraryInitData & library_data_)
    : LibraryBridgeHelper(context_->getGlobalContext())
    , sample_block(sample_block_)
    , library_data(library_data_)
    , dictionary_id(dictionary_id_)
{
}


Poco::URI ExternalDictionaryLibraryBridgeHelper::getPingURI() const
{
    auto uri = createBaseURI();
    uri.setPath(PING_HANDLER);
    uri.addQueryParameter("dictionary_id", toString(dictionary_id));
    return uri;
}


Poco::URI ExternalDictionaryLibraryBridgeHelper::getMainURI() const
{
    auto uri = createBaseURI();
    uri.setPath(MAIN_HANDLER);
    return uri;
}


Poco::URI ExternalDictionaryLibraryBridgeHelper::createRequestURI(const String & method) const
{
    auto uri = getMainURI();
    uri.addQueryParameter("version", std::to_string(LIBRARY_BRIDGE_PROTOCOL_VERSION));
    uri.addQueryParameter("dictionary_id", toString(dictionary_id));
    uri.addQueryParameter("method", method);
    return uri;
}


bool ExternalDictionaryLibraryBridgeHelper::bridgeHandShake()
{
    String result;
    try
    {
        ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts, credentials);
        readString(result, buf);
    }
    catch (...)
    {
        tryLogCurrentException(log);
        return false;
    }

    /*
     * When pinging bridge we also pass current dicionary_id. The bridge will check if there is such
     * dictionary. It is possible that such dictionary_id is not present only in two cases:
     * 1. It is dictionary source creation and initialization of library handler on bridge side did not happen yet.
     * 2. Bridge crashed or restarted for some reason while server did not.
    **/
    if (result.size() != 1)
        throw Exception(ErrorCodes::LOGICAL_ERROR,
                        "Unexpected message from library bridge: {}. "
                        "Check that bridge and server have the same version.", result);

    UInt8 dictionary_id_exists;
    auto parsed = tryParse<UInt8>(dictionary_id_exists, result);
    if (!parsed || (dictionary_id_exists != 0 && dictionary_id_exists != 1))
        throw Exception(ErrorCodes::LOGICAL_ERROR,
                        "Unexpected message from library bridge: {} ({}). "
                        "Check that bridge and server have the same version.",
                        result, parsed ? toString(dictionary_id_exists) : "failed to parse");

    LOG_TRACE(log, "dictionary_id: {}, dictionary_id_exists on bridge side: {}, library confirmed to be initialized on server side: {}",
              toString(dictionary_id), toString(dictionary_id_exists), library_initialized);

    if (dictionary_id_exists && !library_initialized)
        throw Exception(ErrorCodes::LOGICAL_ERROR,
                        "Library was not initialized, but bridge responded to already have dictionary id: {}",
                        dictionary_id);

    /// Here we want to say bridge to recreate a new library handler for current dictionary,
    /// because it responded to have lost it, but we know that it has already been created. (It is a direct result of bridge crash).
    if (!dictionary_id_exists && library_initialized)
    {
        LOG_WARNING(log, "Library bridge does not have library handler with dictionaty id: {}. It will be reinitialized.", dictionary_id);
        bool reinitialized = false;
        try
        {
            auto uri = createRequestURI(EXT_DICT_LIB_NEW_METHOD);
            reinitialized = executeRequest(uri, getInitLibraryCallback());
        }
        catch (...)
        {
            tryLogCurrentException(log);
            return false;
        }

        if (!reinitialized)
            throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR,
                            "Failed to reinitialize library handler on bridge side for dictionary with id: {}", dictionary_id);
    }

    return true;
}


ReadWriteBufferFromHTTP::OutStreamCallback ExternalDictionaryLibraryBridgeHelper::getInitLibraryCallback() const
{
    /// Sample block must contain null values
    WriteBufferFromOwnString out;
    auto output_format = getContext()->getOutputFormat(ExternalDictionaryLibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
    formatBlock(output_format, sample_block);
    auto block_string = out.str();

    return [block_string, this](std::ostream & os)
    {
        os << "library_path=" << escapeForFileName(library_data.library_path) << "&";
        os << "library_settings=" << escapeForFileName(library_data.library_settings) << "&";
        os << "attributes_names=" << escapeForFileName(library_data.dict_attributes) << "&";
        os << "sample_block=" << escapeForFileName(sample_block.getNamesAndTypesList().toString()) << "&";
        os << "null_values=" << escapeForFileName(block_string);
    };
}


bool ExternalDictionaryLibraryBridgeHelper::initLibrary()
{
    startBridgeSync();
    auto uri = createRequestURI(EXT_DICT_LIB_NEW_METHOD);
    library_initialized = executeRequest(uri, getInitLibraryCallback());
    return library_initialized;
}


bool ExternalDictionaryLibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
{
    startBridgeSync();
    auto uri = createRequestURI(EXT_DICT_LIB_CLONE_METHOD);
    uri.addQueryParameter("from_dictionary_id", toString(other_dictionary_id));
    /// We also pass initialization settings in order to create a library handler
    /// in case from_dictionary_id does not exist in bridge side (possible in case of bridge crash).
    library_initialized = executeRequest(uri, getInitLibraryCallback());
    return library_initialized;
}


bool ExternalDictionaryLibraryBridgeHelper::removeLibrary()
{
    /// Do not force bridge restart if it is not running in case of removeLibrary
    /// because in this case after restart it will not have this dictionaty id in memory anyway.
    if (bridgeHandShake())
    {
        auto uri = createRequestURI(EXT_DICT_LIB_DELETE_METHOD);
        return executeRequest(uri);
    }
    return true;
}


bool ExternalDictionaryLibraryBridgeHelper::isModified()
{
    startBridgeSync();
    auto uri = createRequestURI(EXT_DICT_IS_MODIFIED_METHOD);
    return executeRequest(uri);
}


bool ExternalDictionaryLibraryBridgeHelper::supportsSelectiveLoad()
{
    startBridgeSync();
    auto uri = createRequestURI(EXT_DICT_SUPPORTS_SELECTIVE_LOAD_METHOD);
    return executeRequest(uri);
}


QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadAll()
{
    startBridgeSync();
    auto uri = createRequestURI(EXT_DICT_LOAD_ALL_METHOD);
    return QueryPipeline(loadBase(uri));
}


static String getDictIdsString(const std::vector<UInt64> & ids)
{
    WriteBufferFromOwnString out;
    writeVectorBinary(ids, out);
    return out.str();
}


QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
{
    startBridgeSync();
    auto uri = createRequestURI(EXT_DICT_LOAD_IDS_METHOD);
    uri.addQueryParameter("ids_num", toString(ids.size())); /// Not used parameter, but helpful
    auto ids_string = getDictIdsString(ids);
    return QueryPipeline(loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; }));
}


QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadKeys(const Block & requested_block)
{
    startBridgeSync();
    auto uri = createRequestURI(EXT_DICT_LOAD_KEYS_METHOD);
    /// Sample block to parse block from callback
    uri.addQueryParameter("requested_block_sample", requested_block.getNamesAndTypesList().toString());
    ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os)
    {
        WriteBufferFromOStream out_buffer(os);
        auto output_format = getContext()->getOutputFormat(ExternalDictionaryLibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, requested_block.cloneEmpty());
        formatBlock(output_format, requested_block);
        out_buffer.finalize();
    };
    return QueryPipeline(loadBase(uri, out_stream_callback));
}


bool ExternalDictionaryLibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback) const
{
    ReadWriteBufferFromHTTP buf(
        uri,
        Poco::Net::HTTPRequest::HTTP_POST,
        std::move(out_stream_callback),
        http_timeouts, credentials);

    bool res;
    readBoolText(res, buf);
    return res;
}


QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
{
    auto read_buf_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
        uri,
        Poco::Net::HTTPRequest::HTTP_POST,
        std::move(out_stream_callback),
        http_timeouts,
        credentials,
        0,
        DBMS_DEFAULT_BUFFER_SIZE,
        getContext()->getReadSettings(),
        HTTPHeaderEntries{});

    auto source = FormatFactory::instance().getInput(ExternalDictionaryLibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, getContext(), DEFAULT_BLOCK_SIZE);
    source->addBuffer(std::move(read_buf_ptr));
    return QueryPipeline(std::move(source));
}

}