aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.cpp
blob: 32626278bd45dd33dd0660ec0a337305e1883fa8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <Formats/FormatFactory.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
#include <Storages/MeiliSearch/SinkMeiliSearch.h>
#include <base/JSON.h>
#include <base/types.h>

namespace DB
{
namespace ErrorCodes
{
    extern const int MEILISEARCH_EXCEPTION;
}

SinkMeiliSearch::SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_)
    : SinkToStorage(sample_block_), connection(config_), local_context{local_context_}, sample_block{sample_block_}
{
}

// gets the content of the json data section, which was obtained using the JSON format output
// "data": [{...}, {...}, {...}]
void extractData(std::string_view & view)
{
    size_t ind = view.find("\"data\":");
    while (view[ind] != '[')
        ++ind;
    view.remove_prefix(ind);
    size_t bal = ind = 1;
    while (bal > 0)
    {
        if (view[ind] == '[')
            ++bal;
        else if (view[ind] == ']')
            --bal;
        ++ind;
    }
    view.remove_suffix(view.size() - ind);
}

void SinkMeiliSearch::writeBlockData(const Block & block) const
{
    FormatSettings settings = getFormatSettings(local_context);
    settings.json.quote_64bit_integers = false;
    WriteBufferFromOwnString buf;
    auto writer = FormatFactory::instance().getOutputFormat("JSON", buf, sample_block, local_context, settings);
    writer->write(block);
    writer->flush();
    writer->finalize();

    std::string_view vbuf(buf.str());
    extractData(vbuf);

    auto response = connection.updateQuery(vbuf);
    auto jres = JSON(response).begin();
    if (jres.getName() == "message")
        throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
}

void SinkMeiliSearch::consume(Chunk chunk)
{
    auto block = getHeader().cloneWithColumns(chunk.detachColumns());
    writeBlockData(block);
}

}