aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MeiliSearch
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/MeiliSearch
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/MeiliSearch')
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.cpp87
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h24
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.cpp126
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.h51
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.cpp65
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.h28
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.cpp231
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.h53
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.cpp201
-rw-r--r--contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.h41
10 files changed, 907 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.cpp
new file mode 100644
index 0000000000..bbbca00937
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.cpp
@@ -0,0 +1,87 @@
+#include <memory>
+#include <string>
+#include <DataTypes/DataTypeArray.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/DataTypeString.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <DataTypes/Serializations/ISerialization.h>
+#include <IO/ReadHelpers.h>
+#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
+#include <base/JSON.h>
+#include <base/types.h>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int CANNOT_READ_ALL_DATA;
+ extern const int MEILISEARCH_EXCEPTION;
+}
+
+MeiliSearchColumnDescriptionFetcher::MeiliSearchColumnDescriptionFetcher(const MeiliSearchConfiguration & config) : connection(config)
+{
+}
+
+void MeiliSearchColumnDescriptionFetcher::addParam(const String & key, const String & val)
+{
+ query_params[key] = val;
+}
+
+bool checkIfInteger(const String & s)
+{
+ return s.find('.') == String::npos;
+}
+
+DataTypePtr parseTypeOfField(JSON ptr)
+{
+ if (ptr.isString())
+ {
+ return std::make_shared<DataTypeString>();
+ }
+ if (ptr.isArray())
+ {
+ auto nested_type = parseTypeOfField(ptr.begin());
+ return std::make_shared<DataTypeArray>(nested_type);
+ }
+ if (ptr.isBool())
+ {
+ return std::make_shared<DataTypeUInt8>();
+ }
+ if (ptr.isNull())
+ {
+ DataTypePtr res = std::make_shared<DataTypeNullable>(res);
+ return res;
+ }
+ if (ptr.isNumber())
+ {
+ if (checkIfInteger(ptr.toString()))
+ {
+ return std::make_shared<DataTypeInt64>();
+ }
+ return std::make_shared<DataTypeFloat64>();
+ }
+ return std::make_shared<DataTypeString>();
+}
+
+ColumnsDescription MeiliSearchColumnDescriptionFetcher::fetchColumnsDescription() const
+{
+ auto response = connection.searchQuery(query_params);
+ JSON jres = JSON(response).begin();
+
+ if (jres.getName() == "message")
+ throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
+
+ NamesAndTypesList list;
+
+ for (const JSON kv_pair : jres.getValue().begin())
+ {
+ if (!kv_pair.isNameValuePair())
+ throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Bad response data");
+
+ list.emplace_back(kv_pair.getName(), parseTypeOfField(kv_pair.getValue()));
+ }
+
+ return ColumnsDescription(list);
+}
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h
new file mode 100644
index 0000000000..19b40251d9
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <unordered_map>
+#include <Storages/ColumnsDescription.h>
+#include <Storages/MeiliSearch/MeiliSearchConnection.h>
+#include <base/types.h>
+
+namespace DB
+{
+class MeiliSearchColumnDescriptionFetcher
+{
+public:
+ explicit MeiliSearchColumnDescriptionFetcher(const MeiliSearchConfiguration & config);
+
+ void addParam(const String & key, const String & val);
+
+ ColumnsDescription fetchColumnsDescription() const;
+
+private:
+ std::unordered_map<String, String> query_params;
+ MeiliSearchConnection connection;
+};
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.cpp
new file mode 100644
index 0000000000..e34688a636
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.cpp
@@ -0,0 +1,126 @@
+#include <sstream>
+#include <string_view>
+#include <IO/Operators.h>
+#include <IO/WriteBufferFromString.h>
+#include <Storages/MeiliSearch/MeiliSearchConnection.h>
+#include <Common/Exception.h>
+
+#include <Poco/StreamCopier.h>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int NETWORK_ERROR;
+}
+
+MeiliSearchConnection::MeiliSearchConnection(const MeiliConfig & conf) : config{conf}
+{
+ Poco::URI uri(config.connection_string);
+ session.setHost(uri.getHost());
+ session.setPort(uri.getPort());
+}
+
+String MeiliSearchConnection::execPostQuery(const String & url, std::string_view post_fields) const
+{
+ Poco::URI uri(url);
+
+ String path(uri.getPathAndQuery());
+ if (path.empty())
+ path = "/";
+
+ Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_POST, path, Poco::Net::HTTPMessage::HTTP_1_1);
+ req.setContentType("application/json");
+
+ if (!config.key.empty())
+ req.add("Authorization", "Bearer " + config.key);
+
+ req.setContentLength(post_fields.length());
+
+ std::ostream & os = session.sendRequest(req);
+ os << post_fields;
+
+ Poco::Net::HTTPResponse res;
+ std::istream & is = session.receiveResponse(res);
+
+ // need to separate MeiliSearch response from other situations
+ // in order to handle it properly
+ if (res.getStatus() / 100 == 2 || res.getStatus() / 100 == 4)
+ {
+ String response_buffer;
+ Poco::StreamCopier::copyToString(is, response_buffer);
+ return response_buffer;
+ }
+ else
+ throw Exception::createRuntime(ErrorCodes::NETWORK_ERROR, res.getReason());
+}
+
+String MeiliSearchConnection::execGetQuery(const String & url, const std::unordered_map<String, String> & query_params) const
+{
+ Poco::URI uri(url);
+ for (const auto & kv : query_params)
+ {
+ uri.addQueryParameter(kv.first, kv.second);
+ }
+
+ String path(uri.getPathAndQuery());
+ if (path.empty())
+ path = "/";
+
+ Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_GET, path, Poco::Net::HTTPMessage::HTTP_1_1);
+
+ if (!config.key.empty())
+ req.add("Authorization", "Bearer " + config.key);
+
+ session.sendRequest(req);
+
+ Poco::Net::HTTPResponse res;
+ std::istream & is = session.receiveResponse(res);
+
+ // need to separate MeiliSearch response from other situations
+ // in order to handle it properly
+ if (res.getStatus() / 100 == 2 || res.getStatus() / 100 == 4)
+ {
+ String response_buffer;
+ Poco::StreamCopier::copyToString(is, response_buffer);
+ return response_buffer;
+ }
+ else
+ throw Exception::createRuntime(ErrorCodes::NETWORK_ERROR, res.getReason());
+}
+
+
+String MeiliSearchConnection::searchQuery(const std::unordered_map<String, String> & query_params) const
+{
+ WriteBufferFromOwnString post_fields;
+
+ post_fields << "{";
+
+ auto it = query_params.begin();
+ while (it != query_params.end())
+ {
+ post_fields << it->first << ":" << it->second;
+ ++it;
+ if (it != query_params.end())
+ post_fields << ",";
+ }
+
+ post_fields << "}";
+
+ String url = config.connection_string + "search";
+ return execPostQuery(url, post_fields.str());
+}
+
+String MeiliSearchConnection::updateQuery(std::string_view data) const
+{
+ String url = config.connection_string + "documents";
+ return execPostQuery(url, data);
+}
+
+String MeiliSearchConnection::getDocumentsQuery(const std::unordered_map<String, String> & query_params) const
+{
+ String url = config.connection_string + "documents";
+ return execGetQuery(url, query_params);
+}
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.h b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.h
new file mode 100644
index 0000000000..19083985eb
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include <string>
+#include <string_view>
+#include <unordered_map>
+#include <base/types.h>
+
+#include <Poco/Exception.h>
+#include <Poco/Net/HTTPClientSession.h>
+#include <Poco/Net/HTTPRequest.h>
+#include <Poco/Net/HTTPResponse.h>
+#include <Poco/Path.h>
+#include <Poco/URI.h>
+
+namespace DB
+{
+struct MeiliSearchConfiguration
+{
+ String key;
+ String index;
+ String connection_string;
+
+ MeiliSearchConfiguration(const String & url_, const String & index_, const String & key_) : key{key_}, index{index_}
+ {
+ connection_string = url_ + "/indexes/" + index_ + "/";
+ }
+};
+
+using MeiliConfig = MeiliSearchConfiguration;
+
+class MeiliSearchConnection
+{
+public:
+ explicit MeiliSearchConnection(const MeiliConfig & config);
+
+ String searchQuery(const std::unordered_map<String, String> & query_params) const;
+
+ String getDocumentsQuery(const std::unordered_map<String, String> & query_params) const;
+
+ String updateQuery(std::string_view data) const;
+
+private:
+ String execPostQuery(const String & url, std::string_view post_fields) const;
+
+ String execGetQuery(const String & url, const std::unordered_map<String, String> & query_params) const;
+
+ MeiliConfig config;
+ mutable Poco::Net::HTTPClientSession session;
+};
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.cpp
new file mode 100644
index 0000000000..32626278bd
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.cpp
@@ -0,0 +1,65 @@
+#include <Formats/FormatFactory.h>
+#include <IO/WriteBufferFromString.h>
+#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
+#include <Storages/MeiliSearch/SinkMeiliSearch.h>
+#include <base/JSON.h>
+#include <base/types.h>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int MEILISEARCH_EXCEPTION;
+}
+
+SinkMeiliSearch::SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_)
+ : SinkToStorage(sample_block_), connection(config_), local_context{local_context_}, sample_block{sample_block_}
+{
+}
+
+// gets the content of the json data section, which was obtained using the JSON format output
+// "data": [{...}, {...}, {...}]
+void extractData(std::string_view & view)
+{
+ size_t ind = view.find("\"data\":");
+ while (view[ind] != '[')
+ ++ind;
+ view.remove_prefix(ind);
+ size_t bal = ind = 1;
+ while (bal > 0)
+ {
+ if (view[ind] == '[')
+ ++bal;
+ else if (view[ind] == ']')
+ --bal;
+ ++ind;
+ }
+ view.remove_suffix(view.size() - ind);
+}
+
+void SinkMeiliSearch::writeBlockData(const Block & block) const
+{
+ FormatSettings settings = getFormatSettings(local_context);
+ settings.json.quote_64bit_integers = false;
+ WriteBufferFromOwnString buf;
+ auto writer = FormatFactory::instance().getOutputFormat("JSON", buf, sample_block, local_context, settings);
+ writer->write(block);
+ writer->flush();
+ writer->finalize();
+
+ std::string_view vbuf(buf.str());
+ extractData(vbuf);
+
+ auto response = connection.updateQuery(vbuf);
+ auto jres = JSON(response).begin();
+ if (jres.getName() == "message")
+ throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
+}
+
+void SinkMeiliSearch::consume(Chunk chunk)
+{
+ auto block = getHeader().cloneWithColumns(chunk.detachColumns());
+ writeBlockData(block);
+}
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.h b/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.h
new file mode 100644
index 0000000000..9554a33683
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.h
@@ -0,0 +1,28 @@
+#pragma once
+
+#include <Core/ExternalResultDescription.h>
+#include <Interpreters/Context.h>
+#include <Interpreters/Context_fwd.h>
+#include <Processors/Sinks/SinkToStorage.h>
+#include <Storages/MeiliSearch/MeiliSearchConnection.h>
+
+namespace DB
+{
+class SinkMeiliSearch : public SinkToStorage
+{
+public:
+ SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_);
+
+ String getName() const override { return "SinkMeiliSearch"; }
+
+ void consume(Chunk chunk) override;
+
+ void writeBlockData(const Block & block) const;
+
+private:
+ MeiliSearchConnection connection;
+ ContextPtr local_context;
+ Block sample_block;
+};
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.cpp
new file mode 100644
index 0000000000..f567af23a1
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.cpp
@@ -0,0 +1,231 @@
+#include <Columns/ColumnString.h>
+#include <Columns/ColumnVector.h>
+#include <Columns/ColumnsNumber.h>
+#include <Columns/IColumn.h>
+#include <Core/ExternalResultDescription.h>
+#include <Core/Field.h>
+#include <Core/Types.h>
+#include <DataTypes/DataTypeArray.h>
+#include <DataTypes/DataTypeDateTime.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/Serializations/ISerialization.h>
+#include <IO/Operators.h>
+#include <IO/ReadBufferFromString.h>
+#include <IO/ReadHelpers.h>
+#include <IO/WriteBufferFromString.h>
+#include <IO/WriteHelpers.h>
+#include <Storages/MeiliSearch/SourceMeiliSearch.h>
+#include <base/JSON.h>
+#include <base/range.h>
+#include <base/types.h>
+#include <magic_enum.hpp>
+#include <Common/Exception.h>
+#include <Common/quoteString.h>
+#include "Interpreters/ProcessList.h"
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int MEILISEARCH_EXCEPTION;
+ extern const int UNSUPPORTED_MEILISEARCH_TYPE;
+ extern const int MEILISEARCH_MISSING_SOME_COLUMNS;
+}
+
+String MeiliSearchSource::doubleQuoteIfNeed(const String & param) const
+{
+ if (route == QueryRoute::search)
+ return doubleQuoteString(param);
+ return param;
+}
+
+String MeiliSearchSource::constructAttributesToRetrieve() const
+{
+ WriteBufferFromOwnString columns_to_get;
+
+ if (route == QueryRoute::search)
+ columns_to_get << "[";
+
+ auto it = description.sample_block.begin();
+ while (it != description.sample_block.end())
+ {
+ columns_to_get << doubleQuoteIfNeed(it->name);
+ ++it;
+ if (it != description.sample_block.end())
+ columns_to_get << ",";
+ }
+
+ if (route == QueryRoute::search)
+ columns_to_get << "]";
+
+ return columns_to_get.str();
+}
+
+MeiliSearchSource::MeiliSearchSource(
+ const MeiliSearchConfiguration & config,
+ const Block & sample_block,
+ UInt64 max_block_size_,
+ QueryRoute route_,
+ std::unordered_map<String, String> query_params_)
+ : ISource(sample_block.cloneEmpty())
+ , connection(config)
+ , max_block_size{max_block_size_}
+ , route{route_}
+ , query_params{query_params_}
+ , offset{0}
+{
+ description.init(sample_block);
+
+ auto attributes_to_retrieve = constructAttributesToRetrieve();
+
+ query_params[doubleQuoteIfNeed("attributesToRetrieve")] = attributes_to_retrieve;
+ query_params[doubleQuoteIfNeed("limit")] = std::to_string(max_block_size);
+}
+
+
+MeiliSearchSource::~MeiliSearchSource() = default;
+
+Field getField(JSON value, DataTypePtr type_ptr)
+{
+ TypeIndex type_id = type_ptr->getTypeId();
+
+ if (type_id == TypeIndex::UInt64 || type_id == TypeIndex::UInt32 || type_id == TypeIndex::UInt16 || type_id == TypeIndex::UInt8)
+ {
+ if (value.isBool())
+ return value.getBool();
+ else
+ return value.get<UInt64>();
+ }
+ else if (type_id == TypeIndex::Int64 || type_id == TypeIndex::Int32 || type_id == TypeIndex::Int16 || type_id == TypeIndex::Int8)
+ {
+ return value.get<Int64>();
+ }
+ else if (type_id == TypeIndex::String)
+ {
+ if (value.isObject())
+ return value.toString();
+ else
+ return value.get<String>();
+ }
+ else if (type_id == TypeIndex::Float64 || type_id == TypeIndex::Float32)
+ {
+ return value.get<Float64>();
+ }
+ else if (type_id == TypeIndex::Date)
+ {
+ return UInt16{LocalDate{String(value.toString())}.getDayNum()};
+ }
+ else if (type_id == TypeIndex::Date32)
+ {
+ return Int32{LocalDate{String(value.toString())}.getExtenedDayNum()};
+ }
+ else if (type_id == TypeIndex::DateTime)
+ {
+ ReadBufferFromString in(value.toString());
+ time_t time = 0;
+ readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(type_ptr.get())->getTimeZone());
+ if (time < 0)
+ time = 0;
+ return time;
+ }
+ else if (type_id == TypeIndex::Nullable)
+ {
+ if (value.isNull())
+ return Null();
+
+ const auto * null_type = typeid_cast<const DataTypeNullable *>(type_ptr.get());
+ DataTypePtr nested = null_type->getNestedType();
+
+ return getField(value, nested);
+ }
+ else if (type_id == TypeIndex::Array)
+ {
+ const auto * array_type = typeid_cast<const DataTypeArray *>(type_ptr.get());
+ DataTypePtr nested = array_type->getNestedType();
+
+ Array array;
+ for (const auto el : value)
+ array.push_back(getField(el, nested));
+
+ return array;
+ }
+ else
+ {
+ const std::string_view type_name = magic_enum::enum_name(type_id);
+ throw Exception(ErrorCodes::UNSUPPORTED_MEILISEARCH_TYPE, "MeiliSearch storage doesn't support type: {}", type_name);
+ }
+}
+
+void insertWithTypeId(MutableColumnPtr & column, JSON value, DataTypePtr type_ptr)
+{
+ column->insert(getField(value, type_ptr));
+}
+
+size_t MeiliSearchSource::parseJSON(MutableColumns & columns, const JSON & jres) const
+{
+ size_t cnt_match = 0;
+
+ for (const auto json : jres)
+ {
+ ++cnt_match;
+ size_t cnt_fields = 0;
+ for (const auto kv_pair : json)
+ {
+ ++cnt_fields;
+ const auto & name = kv_pair.getName();
+ size_t pos = description.sample_block.getPositionByName(name);
+ MutableColumnPtr & col = columns[pos];
+ DataTypePtr type_ptr = description.sample_block.getByPosition(pos).type;
+ insertWithTypeId(col, kv_pair.getValue(), type_ptr);
+ }
+ if (cnt_fields != columns.size())
+ throw Exception(
+ ErrorCodes::MEILISEARCH_MISSING_SOME_COLUMNS, "Some columns were not found in the table, json = {}", json.toString());
+ }
+ return cnt_match;
+}
+
+Chunk MeiliSearchSource::generate()
+{
+ if (all_read)
+ return {};
+
+ MutableColumns columns = description.sample_block.cloneEmptyColumns();
+ query_params[doubleQuoteIfNeed("offset")] = std::to_string(offset);
+
+ size_t cnt_match = 0;
+
+ if (route == QueryRoute::search)
+ {
+ auto response = connection.searchQuery(query_params);
+ JSON jres = JSON(response).begin();
+ if (jres.getName() == "message")
+ throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.toString());
+
+ cnt_match = parseJSON(columns, jres.getValue());
+ }
+ else
+ {
+ auto response = connection.getDocumentsQuery(query_params);
+ JSON jres(response);
+ if (!jres.isArray())
+ {
+ auto error = jres.getWithDefault<String>("message");
+ throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, error);
+ }
+ cnt_match = parseJSON(columns, jres);
+ }
+
+ offset += cnt_match;
+
+ if (cnt_match == 0)
+ {
+ all_read = true;
+ return {};
+ }
+
+ return Chunk(std::move(columns), cnt_match);
+}
+
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.h b/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.h
new file mode 100644
index 0000000000..6ab24e4763
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.h
@@ -0,0 +1,53 @@
+#pragma once
+
+#include <cstddef>
+#include <unordered_map>
+#include <Core/ColumnsWithTypeAndName.h>
+#include <Core/ExternalResultDescription.h>
+#include <Processors/Chunk.h>
+#include <Processors/ISource.h>
+#include <Storages/MeiliSearch/MeiliSearchConnection.h>
+#include <base/JSON.h>
+
+namespace DB
+{
+class MeiliSearchSource final : public ISource
+{
+public:
+ enum QueryRoute
+ {
+ search,
+ documents
+ };
+
+ MeiliSearchSource(
+ const MeiliSearchConfiguration & config,
+ const Block & sample_block,
+ UInt64 max_block_size_,
+ QueryRoute route,
+ std::unordered_map<String, String> query_params_);
+
+ ~MeiliSearchSource() override;
+
+ String getName() const override { return "MeiliSearchSource"; }
+
+private:
+ String doubleQuoteIfNeed(const String & param) const;
+
+ String constructAttributesToRetrieve() const;
+
+ size_t parseJSON(MutableColumns & columns, const JSON & jres) const;
+
+ Chunk generate() override;
+
+ MeiliSearchConnection connection;
+ const UInt64 max_block_size;
+ const QueryRoute route;
+ ExternalResultDescription description;
+ std::unordered_map<String, String> query_params;
+
+ UInt64 offset;
+ bool all_read = false;
+};
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.cpp
new file mode 100644
index 0000000000..aa8b437263
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.cpp
@@ -0,0 +1,201 @@
+#include <memory>
+#include <Interpreters/evaluateConstantExpression.h>
+#include <Parsers/ASTFunction.h>
+#include <Parsers/ASTSelectQuery.h>
+#include <Parsers/IAST_fwd.h>
+#include <Processors/Formats/IOutputFormat.h>
+#include <QueryPipeline/Pipe.h>
+#include <Storages/IStorage.h>
+#include <Storages/MeiliSearch/MeiliSearchConnection.h>
+#include <Storages/MeiliSearch/SinkMeiliSearch.h>
+#include <Storages/MeiliSearch/SourceMeiliSearch.h>
+#include <Storages/MeiliSearch/StorageMeiliSearch.h>
+#include <Storages/SelectQueryInfo.h>
+#include <Storages/StorageFactory.h>
+#include <Storages/StorageInMemoryMetadata.h>
+#include <Storages/checkAndGetLiteralArgument.h>
+#include <Storages/NamedCollectionsHelpers.h>
+#include <Common/logger_useful.h>
+#include <Common/parseAddress.h>
+#include <Common/NamedCollections/NamedCollections.h>
+#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+ extern const int BAD_QUERY_PARAMETER;
+ extern const int BAD_ARGUMENTS;
+}
+
+StorageMeiliSearch::StorageMeiliSearch(
+ const StorageID & table_id,
+ const MeiliSearchConfiguration & config_,
+ const ColumnsDescription & columns_,
+ const ConstraintsDescription & constraints_,
+ const String & comment)
+ : IStorage(table_id), config{config_}, log(&Poco::Logger::get("StorageMeiliSearch (" + table_id.table_name + ")"))
+{
+ StorageInMemoryMetadata storage_metadata;
+
+ if (columns_.empty())
+ {
+ auto columns = getTableStructureFromData(config);
+ storage_metadata.setColumns(columns);
+ }
+ else
+ storage_metadata.setColumns(columns_);
+
+ storage_metadata.setConstraints(constraints_);
+ storage_metadata.setComment(comment);
+ setInMemoryMetadata(storage_metadata);
+}
+
+ColumnsDescription StorageMeiliSearch::getTableStructureFromData(const MeiliSearchConfiguration & config_)
+{
+ MeiliSearchColumnDescriptionFetcher fetcher(config_);
+ fetcher.addParam(doubleQuoteString("limit"), "1");
+ return fetcher.fetchColumnsDescription();
+}
+
+String convertASTtoStr(ASTPtr ptr)
+{
+ WriteBufferFromOwnString out;
+ IAST::FormatSettings settings(
+ out, /*one_line*/ true, /*hilite*/ false,
+ /*always_quote_identifiers*/ IdentifierQuotingStyle::BackticksMySQL != IdentifierQuotingStyle::None,
+ /*identifier_quoting_style*/ IdentifierQuotingStyle::BackticksMySQL);
+ ptr->format(settings);
+ return out.str();
+}
+
+ASTPtr getFunctionParams(ASTPtr node, const String & name)
+{
+ if (!node)
+ return nullptr;
+
+ const auto * ptr = node->as<ASTFunction>();
+ if (ptr && ptr->name == name)
+ {
+ if (node->children.size() == 1)
+ return node->children[0];
+ else
+ return nullptr;
+ }
+ for (const auto & next : node->children)
+ {
+ auto res = getFunctionParams(next, name);
+ if (res != nullptr)
+ return res;
+ }
+ return nullptr;
+}
+
+Pipe StorageMeiliSearch::read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr /*context*/,
+ QueryProcessingStage::Enum /*processed_stage*/,
+ size_t max_block_size,
+ size_t /*num_streams*/)
+{
+ storage_snapshot->check(column_names);
+
+ ASTPtr original_where = query_info.query->clone()->as<ASTSelectQuery &>().where();
+ ASTPtr query_params = getFunctionParams(original_where, "meiliMatch");
+
+ MeiliSearchSource::QueryRoute route = MeiliSearchSource::QueryRoute::documents;
+
+ std::unordered_map<String, String> kv_pairs_params;
+ if (query_params)
+ {
+ route = MeiliSearchSource::QueryRoute::search;
+ LOG_TRACE(log, "Query params: {}", convertASTtoStr(query_params));
+ for (const auto & el : query_params->children)
+ {
+ auto str = el->getColumnName();
+ auto it = std::find(str.begin(), str.end(), '=');
+ if (it == str.end())
+ throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "meiliMatch function must have parameters of the form \'key=value\'");
+
+ String key(str.begin() + 1, it);
+ String value(it + 1, str.end() - 1);
+ kv_pairs_params[key] = value;
+ }
+ }
+ else
+ {
+ LOG_TRACE(log, "Query params: none");
+ }
+
+ for (const auto & el : kv_pairs_params)
+ LOG_TRACE(log, "Parsed parameter: key = {}, value = {}", el.first, el.second);
+
+ auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
+
+ return Pipe(std::make_shared<MeiliSearchSource>(config, sample_block, max_block_size, route, kv_pairs_params));
+}
+
+SinkToStoragePtr StorageMeiliSearch::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
+{
+ LOG_TRACE(log, "Trying update index: {}", config.index);
+ return std::make_shared<SinkMeiliSearch>(config, metadata_snapshot->getSampleBlock(), local_context);
+}
+
+MeiliSearchConfiguration StorageMeiliSearch::getConfiguration(ASTs engine_args, ContextPtr context)
+{
+ if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
+ {
+ validateNamedCollection(*named_collection, {"url", "index"}, {"key"});
+
+ String url = named_collection->get<String>("url");
+ String index = named_collection->get<String>("index");
+ String key = named_collection->getOrDefault<String>("key", "");
+
+ if (url.empty() || index.empty())
+ {
+ throw Exception(ErrorCodes::BAD_ARGUMENTS,
+ "Storage MeiliSearch requires 3 parameters: MeiliSearch('url', 'index', 'key'= \"\")");
+ }
+
+ return MeiliSearchConfiguration(url, index, key);
+ }
+ else
+ {
+ if (engine_args.size() < 2 || 3 < engine_args.size())
+ {
+ throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
+ "Storage MeiliSearch requires 3 parameters: MeiliSearch('url', 'index', 'key'= \"\")");
+ }
+
+ for (auto & engine_arg : engine_args)
+ engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
+
+ String url = checkAndGetLiteralArgument<String>(engine_args[0], "url");
+ String index = checkAndGetLiteralArgument<String>(engine_args[1], "index");
+ String key;
+ if (engine_args.size() == 3)
+ key = checkAndGetLiteralArgument<String>(engine_args[2], "key");
+ return MeiliSearchConfiguration(url, index, key);
+ }
+}
+
+void registerStorageMeiliSearch(StorageFactory & factory)
+{
+ factory.registerStorage(
+ "MeiliSearch",
+ [](const StorageFactory::Arguments & args)
+ {
+ auto config = StorageMeiliSearch::getConfiguration(args.engine_args, args.getLocalContext());
+ return std::make_shared<StorageMeiliSearch>(args.table_id, config, args.columns, args.constraints, args.comment);
+ },
+ {
+ .supports_schema_inference = true,
+ .source_access_type = AccessType::MEILISEARCH,
+ });
+}
+
+
+}
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.h b/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.h
new file mode 100644
index 0000000000..77cd2afb80
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include <Storages/IStorage.h>
+#include <Storages/MeiliSearch/MeiliSearchConnection.h>
+
+namespace DB
+{
+class StorageMeiliSearch final : public IStorage
+{
+public:
+ StorageMeiliSearch(
+ const StorageID & table_id,
+ const MeiliSearchConfiguration & config_,
+ const ColumnsDescription & columns_,
+ const ConstraintsDescription & constraints_,
+ const String & comment);
+
+ String getName() const override { return "MeiliSearch"; }
+
+ Pipe read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum processed_stage,
+ size_t max_block_size,
+ size_t num_streams) override;
+
+ SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool async_insert) override;
+
+ static MeiliSearchConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
+
+ static ColumnsDescription getTableStructureFromData(const MeiliSearchConfiguration & config_);
+
+private:
+ MeiliSearchConfiguration config;
+
+ Poco::Logger * log;
+};
+
+}