diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/MeiliSearch | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/MeiliSearch')
10 files changed, 907 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.cpp new file mode 100644 index 0000000000..bbbca00937 --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.cpp @@ -0,0 +1,87 @@ +#include <memory> +#include <string> +#include <DataTypes/DataTypeArray.h> +#include <DataTypes/DataTypeNullable.h> +#include <DataTypes/DataTypeString.h> +#include <DataTypes/DataTypesNumber.h> +#include <DataTypes/Serializations/ISerialization.h> +#include <IO/ReadHelpers.h> +#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h> +#include <base/JSON.h> +#include <base/types.h> + +namespace DB +{ +namespace ErrorCodes +{ + extern const int CANNOT_READ_ALL_DATA; + extern const int MEILISEARCH_EXCEPTION; +} + +MeiliSearchColumnDescriptionFetcher::MeiliSearchColumnDescriptionFetcher(const MeiliSearchConfiguration & config) : connection(config) +{ +} + +void MeiliSearchColumnDescriptionFetcher::addParam(const String & key, const String & val) +{ + query_params[key] = val; +} + +bool checkIfInteger(const String & s) +{ + return s.find('.') == String::npos; +} + +DataTypePtr parseTypeOfField(JSON ptr) +{ + if (ptr.isString()) + { + return std::make_shared<DataTypeString>(); + } + if (ptr.isArray()) + { + auto nested_type = parseTypeOfField(ptr.begin()); + return std::make_shared<DataTypeArray>(nested_type); + } + if (ptr.isBool()) + { + return std::make_shared<DataTypeUInt8>(); + } + if (ptr.isNull()) + { + DataTypePtr res = std::make_shared<DataTypeNullable>(res); + return res; + } + if (ptr.isNumber()) + { + if (checkIfInteger(ptr.toString())) + { + return std::make_shared<DataTypeInt64>(); + } + return std::make_shared<DataTypeFloat64>(); + } + return std::make_shared<DataTypeString>(); +} + +ColumnsDescription MeiliSearchColumnDescriptionFetcher::fetchColumnsDescription() const +{ + auto response = connection.searchQuery(query_params); + JSON jres = JSON(response).begin(); + + if (jres.getName() == "message") + throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString()); + + NamesAndTypesList list; + + for (const JSON kv_pair : jres.getValue().begin()) + { + if (!kv_pair.isNameValuePair()) + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Bad response data"); + + list.emplace_back(kv_pair.getName(), parseTypeOfField(kv_pair.getValue())); + } + + return ColumnsDescription(list); +} + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h new file mode 100644 index 0000000000..19b40251d9 --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h @@ -0,0 +1,24 @@ +#pragma once + +#include <unordered_map> +#include <Storages/ColumnsDescription.h> +#include <Storages/MeiliSearch/MeiliSearchConnection.h> +#include <base/types.h> + +namespace DB +{ +class MeiliSearchColumnDescriptionFetcher +{ +public: + explicit MeiliSearchColumnDescriptionFetcher(const MeiliSearchConfiguration & config); + + void addParam(const String & key, const String & val); + + ColumnsDescription fetchColumnsDescription() const; + +private: + std::unordered_map<String, String> query_params; + MeiliSearchConnection connection; +}; + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.cpp new file mode 100644 index 0000000000..e34688a636 --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.cpp @@ -0,0 +1,126 @@ +#include <sstream> +#include <string_view> +#include <IO/Operators.h> +#include <IO/WriteBufferFromString.h> +#include <Storages/MeiliSearch/MeiliSearchConnection.h> +#include <Common/Exception.h> + +#include <Poco/StreamCopier.h> + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NETWORK_ERROR; +} + +MeiliSearchConnection::MeiliSearchConnection(const MeiliConfig & conf) : config{conf} +{ + Poco::URI uri(config.connection_string); + session.setHost(uri.getHost()); + session.setPort(uri.getPort()); +} + +String MeiliSearchConnection::execPostQuery(const String & url, std::string_view post_fields) const +{ + Poco::URI uri(url); + + String path(uri.getPathAndQuery()); + if (path.empty()) + path = "/"; + + Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_POST, path, Poco::Net::HTTPMessage::HTTP_1_1); + req.setContentType("application/json"); + + if (!config.key.empty()) + req.add("Authorization", "Bearer " + config.key); + + req.setContentLength(post_fields.length()); + + std::ostream & os = session.sendRequest(req); + os << post_fields; + + Poco::Net::HTTPResponse res; + std::istream & is = session.receiveResponse(res); + + // need to separate MeiliSearch response from other situations + // in order to handle it properly + if (res.getStatus() / 100 == 2 || res.getStatus() / 100 == 4) + { + String response_buffer; + Poco::StreamCopier::copyToString(is, response_buffer); + return response_buffer; + } + else + throw Exception::createRuntime(ErrorCodes::NETWORK_ERROR, res.getReason()); +} + +String MeiliSearchConnection::execGetQuery(const String & url, const std::unordered_map<String, String> & query_params) const +{ + Poco::URI uri(url); + for (const auto & kv : query_params) + { + uri.addQueryParameter(kv.first, kv.second); + } + + String path(uri.getPathAndQuery()); + if (path.empty()) + path = "/"; + + Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_GET, path, Poco::Net::HTTPMessage::HTTP_1_1); + + if (!config.key.empty()) + req.add("Authorization", "Bearer " + config.key); + + session.sendRequest(req); + + Poco::Net::HTTPResponse res; + std::istream & is = session.receiveResponse(res); + + // need to separate MeiliSearch response from other situations + // in order to handle it properly + if (res.getStatus() / 100 == 2 || res.getStatus() / 100 == 4) + { + String response_buffer; + Poco::StreamCopier::copyToString(is, response_buffer); + return response_buffer; + } + else + throw Exception::createRuntime(ErrorCodes::NETWORK_ERROR, res.getReason()); +} + + +String MeiliSearchConnection::searchQuery(const std::unordered_map<String, String> & query_params) const +{ + WriteBufferFromOwnString post_fields; + + post_fields << "{"; + + auto it = query_params.begin(); + while (it != query_params.end()) + { + post_fields << it->first << ":" << it->second; + ++it; + if (it != query_params.end()) + post_fields << ","; + } + + post_fields << "}"; + + String url = config.connection_string + "search"; + return execPostQuery(url, post_fields.str()); +} + +String MeiliSearchConnection::updateQuery(std::string_view data) const +{ + String url = config.connection_string + "documents"; + return execPostQuery(url, data); +} + +String MeiliSearchConnection::getDocumentsQuery(const std::unordered_map<String, String> & query_params) const +{ + String url = config.connection_string + "documents"; + return execGetQuery(url, query_params); +} + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.h b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.h new file mode 100644 index 0000000000..19083985eb --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/MeiliSearchConnection.h @@ -0,0 +1,51 @@ +#pragma once + +#include <string> +#include <string_view> +#include <unordered_map> +#include <base/types.h> + +#include <Poco/Exception.h> +#include <Poco/Net/HTTPClientSession.h> +#include <Poco/Net/HTTPRequest.h> +#include <Poco/Net/HTTPResponse.h> +#include <Poco/Path.h> +#include <Poco/URI.h> + +namespace DB +{ +struct MeiliSearchConfiguration +{ + String key; + String index; + String connection_string; + + MeiliSearchConfiguration(const String & url_, const String & index_, const String & key_) : key{key_}, index{index_} + { + connection_string = url_ + "/indexes/" + index_ + "/"; + } +}; + +using MeiliConfig = MeiliSearchConfiguration; + +class MeiliSearchConnection +{ +public: + explicit MeiliSearchConnection(const MeiliConfig & config); + + String searchQuery(const std::unordered_map<String, String> & query_params) const; + + String getDocumentsQuery(const std::unordered_map<String, String> & query_params) const; + + String updateQuery(std::string_view data) const; + +private: + String execPostQuery(const String & url, std::string_view post_fields) const; + + String execGetQuery(const String & url, const std::unordered_map<String, String> & query_params) const; + + MeiliConfig config; + mutable Poco::Net::HTTPClientSession session; +}; + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.cpp new file mode 100644 index 0000000000..32626278bd --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.cpp @@ -0,0 +1,65 @@ +#include <Formats/FormatFactory.h> +#include <IO/WriteBufferFromString.h> +#include <Processors/Formats/Impl/JSONRowOutputFormat.h> +#include <Storages/MeiliSearch/SinkMeiliSearch.h> +#include <base/JSON.h> +#include <base/types.h> + +namespace DB +{ +namespace ErrorCodes +{ + extern const int MEILISEARCH_EXCEPTION; +} + +SinkMeiliSearch::SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_) + : SinkToStorage(sample_block_), connection(config_), local_context{local_context_}, sample_block{sample_block_} +{ +} + +// gets the content of the json data section, which was obtained using the JSON format output +// "data": [{...}, {...}, {...}] +void extractData(std::string_view & view) +{ + size_t ind = view.find("\"data\":"); + while (view[ind] != '[') + ++ind; + view.remove_prefix(ind); + size_t bal = ind = 1; + while (bal > 0) + { + if (view[ind] == '[') + ++bal; + else if (view[ind] == ']') + --bal; + ++ind; + } + view.remove_suffix(view.size() - ind); +} + +void SinkMeiliSearch::writeBlockData(const Block & block) const +{ + FormatSettings settings = getFormatSettings(local_context); + settings.json.quote_64bit_integers = false; + WriteBufferFromOwnString buf; + auto writer = FormatFactory::instance().getOutputFormat("JSON", buf, sample_block, local_context, settings); + writer->write(block); + writer->flush(); + writer->finalize(); + + std::string_view vbuf(buf.str()); + extractData(vbuf); + + auto response = connection.updateQuery(vbuf); + auto jres = JSON(response).begin(); + if (jres.getName() == "message") + throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString()); +} + +void SinkMeiliSearch::consume(Chunk chunk) +{ + auto block = getHeader().cloneWithColumns(chunk.detachColumns()); + writeBlockData(block); +} + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.h b/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.h new file mode 100644 index 0000000000..9554a33683 --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/SinkMeiliSearch.h @@ -0,0 +1,28 @@ +#pragma once + +#include <Core/ExternalResultDescription.h> +#include <Interpreters/Context.h> +#include <Interpreters/Context_fwd.h> +#include <Processors/Sinks/SinkToStorage.h> +#include <Storages/MeiliSearch/MeiliSearchConnection.h> + +namespace DB +{ +class SinkMeiliSearch : public SinkToStorage +{ +public: + SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_); + + String getName() const override { return "SinkMeiliSearch"; } + + void consume(Chunk chunk) override; + + void writeBlockData(const Block & block) const; + +private: + MeiliSearchConnection connection; + ContextPtr local_context; + Block sample_block; +}; + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.cpp new file mode 100644 index 0000000000..f567af23a1 --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.cpp @@ -0,0 +1,231 @@ +#include <Columns/ColumnString.h> +#include <Columns/ColumnVector.h> +#include <Columns/ColumnsNumber.h> +#include <Columns/IColumn.h> +#include <Core/ExternalResultDescription.h> +#include <Core/Field.h> +#include <Core/Types.h> +#include <DataTypes/DataTypeArray.h> +#include <DataTypes/DataTypeDateTime.h> +#include <DataTypes/DataTypeNullable.h> +#include <DataTypes/Serializations/ISerialization.h> +#include <IO/Operators.h> +#include <IO/ReadBufferFromString.h> +#include <IO/ReadHelpers.h> +#include <IO/WriteBufferFromString.h> +#include <IO/WriteHelpers.h> +#include <Storages/MeiliSearch/SourceMeiliSearch.h> +#include <base/JSON.h> +#include <base/range.h> +#include <base/types.h> +#include <magic_enum.hpp> +#include <Common/Exception.h> +#include <Common/quoteString.h> +#include "Interpreters/ProcessList.h" + +namespace DB +{ +namespace ErrorCodes +{ + extern const int MEILISEARCH_EXCEPTION; + extern const int UNSUPPORTED_MEILISEARCH_TYPE; + extern const int MEILISEARCH_MISSING_SOME_COLUMNS; +} + +String MeiliSearchSource::doubleQuoteIfNeed(const String & param) const +{ + if (route == QueryRoute::search) + return doubleQuoteString(param); + return param; +} + +String MeiliSearchSource::constructAttributesToRetrieve() const +{ + WriteBufferFromOwnString columns_to_get; + + if (route == QueryRoute::search) + columns_to_get << "["; + + auto it = description.sample_block.begin(); + while (it != description.sample_block.end()) + { + columns_to_get << doubleQuoteIfNeed(it->name); + ++it; + if (it != description.sample_block.end()) + columns_to_get << ","; + } + + if (route == QueryRoute::search) + columns_to_get << "]"; + + return columns_to_get.str(); +} + +MeiliSearchSource::MeiliSearchSource( + const MeiliSearchConfiguration & config, + const Block & sample_block, + UInt64 max_block_size_, + QueryRoute route_, + std::unordered_map<String, String> query_params_) + : ISource(sample_block.cloneEmpty()) + , connection(config) + , max_block_size{max_block_size_} + , route{route_} + , query_params{query_params_} + , offset{0} +{ + description.init(sample_block); + + auto attributes_to_retrieve = constructAttributesToRetrieve(); + + query_params[doubleQuoteIfNeed("attributesToRetrieve")] = attributes_to_retrieve; + query_params[doubleQuoteIfNeed("limit")] = std::to_string(max_block_size); +} + + +MeiliSearchSource::~MeiliSearchSource() = default; + +Field getField(JSON value, DataTypePtr type_ptr) +{ + TypeIndex type_id = type_ptr->getTypeId(); + + if (type_id == TypeIndex::UInt64 || type_id == TypeIndex::UInt32 || type_id == TypeIndex::UInt16 || type_id == TypeIndex::UInt8) + { + if (value.isBool()) + return value.getBool(); + else + return value.get<UInt64>(); + } + else if (type_id == TypeIndex::Int64 || type_id == TypeIndex::Int32 || type_id == TypeIndex::Int16 || type_id == TypeIndex::Int8) + { + return value.get<Int64>(); + } + else if (type_id == TypeIndex::String) + { + if (value.isObject()) + return value.toString(); + else + return value.get<String>(); + } + else if (type_id == TypeIndex::Float64 || type_id == TypeIndex::Float32) + { + return value.get<Float64>(); + } + else if (type_id == TypeIndex::Date) + { + return UInt16{LocalDate{String(value.toString())}.getDayNum()}; + } + else if (type_id == TypeIndex::Date32) + { + return Int32{LocalDate{String(value.toString())}.getExtenedDayNum()}; + } + else if (type_id == TypeIndex::DateTime) + { + ReadBufferFromString in(value.toString()); + time_t time = 0; + readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(type_ptr.get())->getTimeZone()); + if (time < 0) + time = 0; + return time; + } + else if (type_id == TypeIndex::Nullable) + { + if (value.isNull()) + return Null(); + + const auto * null_type = typeid_cast<const DataTypeNullable *>(type_ptr.get()); + DataTypePtr nested = null_type->getNestedType(); + + return getField(value, nested); + } + else if (type_id == TypeIndex::Array) + { + const auto * array_type = typeid_cast<const DataTypeArray *>(type_ptr.get()); + DataTypePtr nested = array_type->getNestedType(); + + Array array; + for (const auto el : value) + array.push_back(getField(el, nested)); + + return array; + } + else + { + const std::string_view type_name = magic_enum::enum_name(type_id); + throw Exception(ErrorCodes::UNSUPPORTED_MEILISEARCH_TYPE, "MeiliSearch storage doesn't support type: {}", type_name); + } +} + +void insertWithTypeId(MutableColumnPtr & column, JSON value, DataTypePtr type_ptr) +{ + column->insert(getField(value, type_ptr)); +} + +size_t MeiliSearchSource::parseJSON(MutableColumns & columns, const JSON & jres) const +{ + size_t cnt_match = 0; + + for (const auto json : jres) + { + ++cnt_match; + size_t cnt_fields = 0; + for (const auto kv_pair : json) + { + ++cnt_fields; + const auto & name = kv_pair.getName(); + size_t pos = description.sample_block.getPositionByName(name); + MutableColumnPtr & col = columns[pos]; + DataTypePtr type_ptr = description.sample_block.getByPosition(pos).type; + insertWithTypeId(col, kv_pair.getValue(), type_ptr); + } + if (cnt_fields != columns.size()) + throw Exception( + ErrorCodes::MEILISEARCH_MISSING_SOME_COLUMNS, "Some columns were not found in the table, json = {}", json.toString()); + } + return cnt_match; +} + +Chunk MeiliSearchSource::generate() +{ + if (all_read) + return {}; + + MutableColumns columns = description.sample_block.cloneEmptyColumns(); + query_params[doubleQuoteIfNeed("offset")] = std::to_string(offset); + + size_t cnt_match = 0; + + if (route == QueryRoute::search) + { + auto response = connection.searchQuery(query_params); + JSON jres = JSON(response).begin(); + if (jres.getName() == "message") + throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.toString()); + + cnt_match = parseJSON(columns, jres.getValue()); + } + else + { + auto response = connection.getDocumentsQuery(query_params); + JSON jres(response); + if (!jres.isArray()) + { + auto error = jres.getWithDefault<String>("message"); + throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, error); + } + cnt_match = parseJSON(columns, jres); + } + + offset += cnt_match; + + if (cnt_match == 0) + { + all_read = true; + return {}; + } + + return Chunk(std::move(columns), cnt_match); +} + + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.h b/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.h new file mode 100644 index 0000000000..6ab24e4763 --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/SourceMeiliSearch.h @@ -0,0 +1,53 @@ +#pragma once + +#include <cstddef> +#include <unordered_map> +#include <Core/ColumnsWithTypeAndName.h> +#include <Core/ExternalResultDescription.h> +#include <Processors/Chunk.h> +#include <Processors/ISource.h> +#include <Storages/MeiliSearch/MeiliSearchConnection.h> +#include <base/JSON.h> + +namespace DB +{ +class MeiliSearchSource final : public ISource +{ +public: + enum QueryRoute + { + search, + documents + }; + + MeiliSearchSource( + const MeiliSearchConfiguration & config, + const Block & sample_block, + UInt64 max_block_size_, + QueryRoute route, + std::unordered_map<String, String> query_params_); + + ~MeiliSearchSource() override; + + String getName() const override { return "MeiliSearchSource"; } + +private: + String doubleQuoteIfNeed(const String & param) const; + + String constructAttributesToRetrieve() const; + + size_t parseJSON(MutableColumns & columns, const JSON & jres) const; + + Chunk generate() override; + + MeiliSearchConnection connection; + const UInt64 max_block_size; + const QueryRoute route; + ExternalResultDescription description; + std::unordered_map<String, String> query_params; + + UInt64 offset; + bool all_read = false; +}; + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.cpp b/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.cpp new file mode 100644 index 0000000000..aa8b437263 --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.cpp @@ -0,0 +1,201 @@ +#include <memory> +#include <Interpreters/evaluateConstantExpression.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTSelectQuery.h> +#include <Parsers/IAST_fwd.h> +#include <Processors/Formats/IOutputFormat.h> +#include <QueryPipeline/Pipe.h> +#include <Storages/IStorage.h> +#include <Storages/MeiliSearch/MeiliSearchConnection.h> +#include <Storages/MeiliSearch/SinkMeiliSearch.h> +#include <Storages/MeiliSearch/SourceMeiliSearch.h> +#include <Storages/MeiliSearch/StorageMeiliSearch.h> +#include <Storages/SelectQueryInfo.h> +#include <Storages/StorageFactory.h> +#include <Storages/StorageInMemoryMetadata.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Storages/NamedCollectionsHelpers.h> +#include <Common/logger_useful.h> +#include <Common/parseAddress.h> +#include <Common/NamedCollections/NamedCollections.h> +#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h> + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_QUERY_PARAMETER; + extern const int BAD_ARGUMENTS; +} + +StorageMeiliSearch::StorageMeiliSearch( + const StorageID & table_id, + const MeiliSearchConfiguration & config_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment) + : IStorage(table_id), config{config_}, log(&Poco::Logger::get("StorageMeiliSearch (" + table_id.table_name + ")")) +{ + StorageInMemoryMetadata storage_metadata; + + if (columns_.empty()) + { + auto columns = getTableStructureFromData(config); + storage_metadata.setColumns(columns); + } + else + storage_metadata.setColumns(columns_); + + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); +} + +ColumnsDescription StorageMeiliSearch::getTableStructureFromData(const MeiliSearchConfiguration & config_) +{ + MeiliSearchColumnDescriptionFetcher fetcher(config_); + fetcher.addParam(doubleQuoteString("limit"), "1"); + return fetcher.fetchColumnsDescription(); +} + +String convertASTtoStr(ASTPtr ptr) +{ + WriteBufferFromOwnString out; + IAST::FormatSettings settings( + out, /*one_line*/ true, /*hilite*/ false, + /*always_quote_identifiers*/ IdentifierQuotingStyle::BackticksMySQL != IdentifierQuotingStyle::None, + /*identifier_quoting_style*/ IdentifierQuotingStyle::BackticksMySQL); + ptr->format(settings); + return out.str(); +} + +ASTPtr getFunctionParams(ASTPtr node, const String & name) +{ + if (!node) + return nullptr; + + const auto * ptr = node->as<ASTFunction>(); + if (ptr && ptr->name == name) + { + if (node->children.size() == 1) + return node->children[0]; + else + return nullptr; + } + for (const auto & next : node->children) + { + auto res = getFunctionParams(next, name); + if (res != nullptr) + return res; + } + return nullptr; +} + +Pipe StorageMeiliSearch::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr /*context*/, + QueryProcessingStage::Enum /*processed_stage*/, + size_t max_block_size, + size_t /*num_streams*/) +{ + storage_snapshot->check(column_names); + + ASTPtr original_where = query_info.query->clone()->as<ASTSelectQuery &>().where(); + ASTPtr query_params = getFunctionParams(original_where, "meiliMatch"); + + MeiliSearchSource::QueryRoute route = MeiliSearchSource::QueryRoute::documents; + + std::unordered_map<String, String> kv_pairs_params; + if (query_params) + { + route = MeiliSearchSource::QueryRoute::search; + LOG_TRACE(log, "Query params: {}", convertASTtoStr(query_params)); + for (const auto & el : query_params->children) + { + auto str = el->getColumnName(); + auto it = std::find(str.begin(), str.end(), '='); + if (it == str.end()) + throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "meiliMatch function must have parameters of the form \'key=value\'"); + + String key(str.begin() + 1, it); + String value(it + 1, str.end() - 1); + kv_pairs_params[key] = value; + } + } + else + { + LOG_TRACE(log, "Query params: none"); + } + + for (const auto & el : kv_pairs_params) + LOG_TRACE(log, "Parsed parameter: key = {}, value = {}", el.first, el.second); + + auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names); + + return Pipe(std::make_shared<MeiliSearchSource>(config, sample_block, max_block_size, route, kv_pairs_params)); +} + +SinkToStoragePtr StorageMeiliSearch::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) +{ + LOG_TRACE(log, "Trying update index: {}", config.index); + return std::make_shared<SinkMeiliSearch>(config, metadata_snapshot->getSampleBlock(), local_context); +} + +MeiliSearchConfiguration StorageMeiliSearch::getConfiguration(ASTs engine_args, ContextPtr context) +{ + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context)) + { + validateNamedCollection(*named_collection, {"url", "index"}, {"key"}); + + String url = named_collection->get<String>("url"); + String index = named_collection->get<String>("index"); + String key = named_collection->getOrDefault<String>("key", ""); + + if (url.empty() || index.empty()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Storage MeiliSearch requires 3 parameters: MeiliSearch('url', 'index', 'key'= \"\")"); + } + + return MeiliSearchConfiguration(url, index, key); + } + else + { + if (engine_args.size() < 2 || 3 < engine_args.size()) + { + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage MeiliSearch requires 3 parameters: MeiliSearch('url', 'index', 'key'= \"\")"); + } + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); + + String url = checkAndGetLiteralArgument<String>(engine_args[0], "url"); + String index = checkAndGetLiteralArgument<String>(engine_args[1], "index"); + String key; + if (engine_args.size() == 3) + key = checkAndGetLiteralArgument<String>(engine_args[2], "key"); + return MeiliSearchConfiguration(url, index, key); + } +} + +void registerStorageMeiliSearch(StorageFactory & factory) +{ + factory.registerStorage( + "MeiliSearch", + [](const StorageFactory::Arguments & args) + { + auto config = StorageMeiliSearch::getConfiguration(args.engine_args, args.getLocalContext()); + return std::make_shared<StorageMeiliSearch>(args.table_id, config, args.columns, args.constraints, args.comment); + }, + { + .supports_schema_inference = true, + .source_access_type = AccessType::MEILISEARCH, + }); +} + + +} diff --git a/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.h b/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.h new file mode 100644 index 0000000000..77cd2afb80 --- /dev/null +++ b/contrib/clickhouse/src/Storages/MeiliSearch/StorageMeiliSearch.h @@ -0,0 +1,41 @@ +#pragma once + +#include <Storages/IStorage.h> +#include <Storages/MeiliSearch/MeiliSearchConnection.h> + +namespace DB +{ +class StorageMeiliSearch final : public IStorage +{ +public: + StorageMeiliSearch( + const StorageID & table_id, + const MeiliSearchConfiguration & config_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment); + + String getName() const override { return "MeiliSearch"; } + + Pipe read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) override; + + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool async_insert) override; + + static MeiliSearchConfiguration getConfiguration(ASTs engine_args, ContextPtr context); + + static ColumnsDescription getTableStructureFromData(const MeiliSearchConfiguration & config_); + +private: + MeiliSearchConfiguration config; + + Poco::Logger * log; +}; + +} |