diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/TableFunctions | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/TableFunctions')
71 files changed, 6058 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/TableFunctions/ITableFunction.cpp b/contrib/clickhouse/src/TableFunctions/ITableFunction.cpp new file mode 100644 index 0000000000..137e1dc27f --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/ITableFunction.cpp @@ -0,0 +1,53 @@ +#include <TableFunctions/ITableFunction.h> +#include <Interpreters/Context.h> +#include <Storages/StorageFactory.h> +#include <Storages/StorageTableFunction.h> +#include <Access/Common/AccessFlags.h> +#include <Common/ProfileEvents.h> +#include <TableFunctions/TableFunctionFactory.h> + + +namespace ProfileEvents +{ + extern const Event TableFunctionExecute; +} + +namespace DB +{ + +AccessType ITableFunction::getSourceAccessType() const +{ + return StorageFactory::instance().getSourceAccessType(getStorageTypeName()); +} + +StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, + ColumnsDescription cached_columns, bool use_global_context, bool is_insert_query) const +{ + ProfileEvents::increment(ProfileEvents::TableFunctionExecute); + + AccessFlags required_access = getSourceAccessType(); + auto table_function_properties = TableFunctionFactory::instance().tryGetProperties(getName()); + if (is_insert_query || !(table_function_properties && table_function_properties->allow_readonly)) + required_access |= AccessType::CREATE_TEMPORARY_TABLE; + context->checkAccess(required_access); + + auto context_to_use = use_global_context ? context->getGlobalContext() : context; + + if (cached_columns.empty()) + return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query); + + if (hasStaticStructure() && cached_columns == getActualTableStructure(context,is_insert_query)) + return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query); + + auto this_table_function = shared_from_this(); + auto get_storage = [=]() -> StoragePtr + { + return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns, is_insert_query); + }; + + /// It will request actual table structure and create underlying storage lazily + return std::make_shared<StorageTableFunctionProxy>(StorageID(getDatabaseName(), table_name), std::move(get_storage), + std::move(cached_columns), needStructureConversion()); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/ITableFunction.h b/contrib/clickhouse/src/TableFunctions/ITableFunction.h new file mode 100644 index 0000000000..1946d8e890 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/ITableFunction.h @@ -0,0 +1,118 @@ +#pragma once + +#include <Parsers/IAST_fwd.h> +#include <Storages/IStorage_fwd.h> +#include <Storages/ColumnsDescription.h> +#include <Access/Common/AccessType.h> +#include <Common/FunctionDocumentation.h> +#include <Analyzer/IQueryTreeNode.h> + +#include <memory> +#include <string> + + +namespace DB +{ + +class Context; + +/** Interface for table functions. + * + * Table functions are not relevant to other functions. + * The table function can be specified in the FROM section instead of the [db.]Table + * The table function returns a temporary StoragePtr object that is used to execute the query. + * + * Example: + * SELECT count() FROM remote('example01-01-1', merge, hits) + * - go to `example01-01-1`, in `merge` database, `hits` table. + * + * + * When creating table AS table_function(...) we probably don't know structure of the table + * and have to request if from remote server, because structure is required to create a Storage. + * To avoid failures on server startup, we write obtained structure to metadata file. + * So, table function may have two different columns lists: + * - cached_columns written to metadata + * - the list returned from getActualTableStructure(...) + * See StorageTableFunctionProxy. + */ + +class ITableFunction : public std::enable_shared_from_this<ITableFunction> +{ +public: + static inline std::string getDatabaseName() { return "_table_function"; } + + /// Get the main function name. + virtual std::string getName() const = 0; + + /// Returns true if we always know table structure when executing table function + /// (e.g. structure is specified in table function arguments) + virtual bool hasStaticStructure() const { return false; } + /// Returns false if storage returned by table function supports type conversion (e.g. StorageDistributed) + virtual bool needStructureConversion() const { return true; } + + /** Return array of table function arguments indexes for which query tree analysis must be skipped. + * It is important for table functions that take subqueries, because otherwise analyzer will resolve them. + */ + virtual std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & /*query_node_table_function*/, ContextPtr /*context*/) const { return {}; } + + virtual void parseArguments(const ASTPtr & /*ast_function*/, ContextPtr /*context*/) {} + + /// Returns actual table structure probably requested from remote server, may fail + virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/, bool is_insert_query) const = 0; + + /// Check if table function needs a structure hint from SELECT query in case of + /// INSERT INTO FUNCTION ... SELECT ... and INSERT INTO ... SELECT ... FROM table_function(...) + /// It's used for schema inference. + virtual bool needStructureHint() const { return false; } + + /// Set a structure hint from SELECT query in case of + /// INSERT INTO FUNCTION ... SELECT ... and INSERT INTO ... SELECT ... FROM table_function(...) + /// This hint could be used not to repeat schema in function arguments. + virtual void setStructureHint(const ColumnsDescription &) {} + + /// Used for table functions that can use structure hint during INSERT INTO ... SELECT ... FROM table_function(...) + /// It returns possible virtual column names of corresponding storage. If select query contains + /// one of these columns, the structure from insertion table won't be used as a structure hint, + /// because we cannot determine which column from table correspond to this virtual column. + virtual std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const { return {}; } + + virtual bool supportsReadingSubsetOfColumns(const ContextPtr &) { return true; } + + /// Create storage according to the query. + StoragePtr + execute(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns_ = {}, bool use_global_context = false, bool is_insert = false) const; + + virtual ~ITableFunction() = default; + +protected: + virtual AccessType getSourceAccessType() const; + +private: + virtual StoragePtr executeImpl( + const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const = 0; + + virtual const char * getStorageTypeName() const = 0; +}; + +/// Properties of table function that are independent of argument types and parameters. +struct TableFunctionProperties +{ + FunctionDocumentation documentation; + + /** It is determined by the possibility of modifying any data or making requests to arbitrary hostnames. + * + * If users can make a request to an arbitrary hostname, they can get the info from the internal network + * or manipulate internal APIs (say - put some data into Memcached, which is available only in the corporate network). + * This is named "SSRF attack". + * Or a user can use an open ClickHouse server to amplify DoS attacks. + * + * In those cases, the table function should not be allowed in readonly mode. + */ + bool allow_readonly = false; +}; + + +using TableFunctionPtr = std::shared_ptr<ITableFunction>; + + +} diff --git a/contrib/clickhouse/src/TableFunctions/ITableFunctionCluster.h b/contrib/clickhouse/src/TableFunctions/ITableFunctionCluster.h new file mode 100644 index 0000000000..4776ff44a3 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/ITableFunctionCluster.h @@ -0,0 +1,73 @@ +#pragma once + +#include "clickhouse_config.h" + +#include <Interpreters/Context.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Storages/StorageS3Cluster.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionAzureBlobStorage.h> +#include <TableFunctions/TableFunctionS3.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int CLUSTER_DOESNT_EXIST; + extern const int LOGICAL_ERROR; +} + +/// Base class for *Cluster table functions that require cluster_name for the first argument. +template <typename Base> +class ITableFunctionCluster : public Base +{ +public: + String getName() const override = 0; + String getSignature() const override = 0; + + static void addColumnsStructureToArguments(ASTs & args, const String & desired_structure, const ContextPtr & context) + { + if (args.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected empty list of arguments for {}Cluster table function", Base::name); + + ASTPtr cluster_name_arg = args.front(); + args.erase(args.begin()); + Base::addColumnsStructureToArguments(args, desired_structure, context); + args.insert(args.begin(), cluster_name_arg); + } + +protected: + void parseArguments(const ASTPtr & ast, ContextPtr context) override + { + /// Clone ast function, because we can modify its arguments like removing cluster_name + Base::parseArguments(ast->clone(), context); + } + + void parseArgumentsImpl(ASTs & args, const ContextPtr & context) override + { + if (args.empty()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature()); + + /// Evaluate only first argument, everything else will be done Base class + args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context); + + /// Cluster name is always the first + cluster_name = checkAndGetLiteralArgument<String>(args[0], "cluster_name"); + + if (!context->tryGetCluster(cluster_name)) + throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name); + + /// Just cut the first arg (cluster_name) and try to parse other table function arguments as is + args.erase(args.begin()); + + Base::parseArgumentsImpl(args, context); + } + + String cluster_name; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/ITableFunctionDataLake.h b/contrib/clickhouse/src/TableFunctions/ITableFunctionDataLake.h new file mode 100644 index 0000000000..e9ecb3d577 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/ITableFunctionDataLake.h @@ -0,0 +1,66 @@ +#pragma once + +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +# include <Access/Common/AccessFlags.h> +# include <Formats/FormatFactory.h> +# include <Interpreters/Context.h> +# include <Interpreters/parseColumnsListForTableFunction.h> +# include <Storages/IStorage.h> +# include <TableFunctions/ITableFunction.h> + +namespace DB +{ + +template <typename Name, typename Storage, typename TableFunction> +class ITableFunctionDataLake : public TableFunction +{ +public: + static constexpr auto name = Name::name; + std::string getName() const override { return name; } + +protected: + StoragePtr executeImpl( + const ASTPtr & /*ast_function*/, + ContextPtr context, + const std::string & table_name, + ColumnsDescription /*cached_columns*/, + bool /*is_insert_query*/) const override + { + ColumnsDescription columns; + if (TableFunction::configuration.structure != "auto") + columns = parseColumnsListFromString(TableFunction::configuration.structure, context); + + StoragePtr storage = std::make_shared<Storage>( + TableFunction::configuration, context, StorageID(TableFunction::getDatabaseName(), table_name), + columns, ConstraintsDescription{}, String{}, std::nullopt); + + storage->startup(); + return storage; + } + + const char * getStorageTypeName() const override { return Storage::name; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const override + { + if (TableFunction::configuration.structure == "auto") + { + context->checkAccess(TableFunction::getSourceAccessType()); + return Storage::getTableStructureFromData(TableFunction::configuration, std::nullopt, context); + } + + return parseColumnsListFromString(TableFunction::configuration.structure, context); + } + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override + { + /// Set default format to Parquet if it's not specified in arguments. + TableFunction::configuration.format = "Parquet"; + TableFunction::parseArguments(ast_function, context); + } +}; +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/ITableFunctionFileLike.cpp b/contrib/clickhouse/src/TableFunctions/ITableFunctionFileLike.cpp new file mode 100644 index 0000000000..b88af85530 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/ITableFunctionFileLike.cpp @@ -0,0 +1,126 @@ +#include <TableFunctions/ITableFunctionFileLike.h> +#include <Interpreters/parseColumnsListForTableFunction.h> + +#include <Parsers/ASTFunction.h> + +#include <Common/Exception.h> + +#include <Storages/StorageFile.h> +#include <Storages/checkAndGetLiteralArgument.h> + +#include <Interpreters/evaluateConstantExpression.h> + +#include <Formats/FormatFactory.h> + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +void ITableFunctionFileLike::parseFirstArguments(const ASTPtr & arg, const ContextPtr &) +{ + filename = checkAndGetLiteralArgument<String>(arg, "source"); +} + +String ITableFunctionFileLike::getFormatFromFirstArgument() +{ + return FormatFactory::instance().getFormatFromFileName(filename, true); +} + +bool ITableFunctionFileLike::supportsReadingSubsetOfColumns(const ContextPtr & context) +{ + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format, context); +} + +void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + /// Parse args + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments.", getName()); + + ASTs & args = args_func.at(0)->children; + parseArgumentsImpl(args, context); +} + +void ITableFunctionFileLike::parseArgumentsImpl(ASTs & args, const ContextPtr & context) +{ + if (args.empty() || args.size() > 4) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature()); + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + parseFirstArguments(args[0], context); + + if (args.size() > 1) + format = checkAndGetLiteralArgument<String>(args[1], "format"); + + if (format == "auto") + format = getFormatFromFirstArgument(); + + if (args.size() > 2) + { + structure = checkAndGetLiteralArgument<String>(args[2], "structure"); + if (structure.empty()) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Table structure is empty for table function '{}'. If you want to use automatic schema inference, use 'auto'", + getName()); + } + + if (args.size() > 3) + compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method"); +} + +void ITableFunctionFileLike::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr &) +{ + if (args.empty() || args.size() > getMaxNumberOfArguments()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), args.size()); + + auto structure_literal = std::make_shared<ASTLiteral>(structure); + + /// f(filename) + if (args.size() == 1) + { + /// Add format=auto before structure argument. + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(structure_literal); + } + /// f(filename, format) + else if (args.size() == 2) + { + args.push_back(structure_literal); + } + /// f(filename, format, 'auto') + else if (args.size() == 3) + { + args.back() = structure_literal; + } + /// f(filename, format, 'auto', compression) + else if (args.size() == 4) + { + args[args.size() - 2] = structure_literal; + } +} + +StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const +{ + ColumnsDescription columns; + if (structure != "auto") + columns = parseColumnsListFromString(structure, context); + else if (!structure_hint.empty()) + columns = structure_hint; + + StoragePtr storage = getStorage(filename, format, columns, context, table_name, compression_method); + storage->startup(); + return storage; +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/ITableFunctionFileLike.h b/contrib/clickhouse/src/TableFunctions/ITableFunctionFileLike.h new file mode 100644 index 0000000000..5fe8658779 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/ITableFunctionFileLike.h @@ -0,0 +1,61 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> +#include "Parsers/IAST_fwd.h" + +namespace DB +{ +class ColumnsDescription; +class Context; + +/* + * function(source, [format, structure, compression_method]) - creates a temporary storage from formatted source + */ +class ITableFunctionFileLike : public ITableFunction +{ +public: + static constexpr auto signature = " - filename\n" + " - filename, format\n" + " - filename, format, structure\n" + " - filename, format, structure, compression_method\n"; + virtual String getSignature() const + { + return signature; + } + + bool needStructureHint() const override { return structure == "auto"; } + + void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } + + bool supportsReadingSubsetOfColumns(const ContextPtr & context) override; + + static size_t getMaxNumberOfArguments() { return 4; } + + static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr &); + +protected: + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context); + + virtual void parseFirstArguments(const ASTPtr & arg, const ContextPtr & context); + virtual String getFormatFromFirstArgument(); + + String filename; + String path_to_archive; + String format = "auto"; + String structure = "auto"; + String compression_method = "auto"; + ColumnsDescription structure_hint; + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + virtual StoragePtr getStorage( + const String & source, const String & format, const ColumnsDescription & columns, ContextPtr global_context, + const std::string & table_name, const String & compression_method) const = 0; + + bool hasStaticStructure() const override { return structure != "auto"; } +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/ITableFunctionXDBC.cpp b/contrib/clickhouse/src/TableFunctions/ITableFunctionXDBC.cpp new file mode 100644 index 0000000000..59702259b3 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/ITableFunctionXDBC.cpp @@ -0,0 +1,114 @@ +#include <DataTypes/DataTypeFactory.h> +#include <IO/ReadHelpers.h> +#include <IO/WriteHelpers.h> +#include <IO/ReadWriteBufferFromHTTP.h> +#include <IO/ConnectionTimeouts.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/parseQuery.h> +#include <Storages/StorageXDBC.h> +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/ITableFunctionXDBC.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Poco/Net/HTTPRequest.h> +#include <Common/Exception.h> +#include "registerTableFunctions.h" + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int LOGICAL_ERROR; +} + +void ITableFunctionXDBC::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto & args_func = ast_function->as<ASTFunction &>(); + + if (!args_func.arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments.", getName()); + + ASTs & args = args_func.arguments->children; + if (args.size() != 2 && args.size() != 3) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function '{0}' requires 2 or 3 arguments: {0}('DSN', table) or {0}('DSN', schema, table)", getName()); + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + if (args.size() == 3) + { + connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>(); + schema_name = args[1]->as<ASTLiteral &>().value.safeGet<String>(); + remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>(); + } + else if (args.size() == 2) + { + connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>(); + remote_table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>(); + } +} + +void ITableFunctionXDBC::startBridgeIfNot(ContextPtr context) const +{ + if (!helper) + { + helper = createBridgeHelper(context, context->getSettingsRef().http_receive_timeout.value, connection_string, context->getSettingsRef().odbc_bridge_use_connection_pooling.value); + helper->startBridgeSync(); + } +} + +ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + startBridgeIfNot(context); + + /// Infer external table structure. + Poco::URI columns_info_uri = helper->getColumnsInfoURI(); + columns_info_uri.addQueryParameter("connection_string", connection_string); + if (!schema_name.empty()) + columns_info_uri.addQueryParameter("schema", schema_name); + columns_info_uri.addQueryParameter("table", remote_table_name); + + bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls; + columns_info_uri.addQueryParameter("external_table_functions_use_nulls", toString(use_nulls)); + + Poco::Net::HTTPBasicCredentials credentials{}; + ReadWriteBufferFromHTTP buf( + columns_info_uri, + Poco::Net::HTTPRequest::HTTP_POST, + {}, + ConnectionTimeouts::getHTTPTimeouts( + context->getSettingsRef(), + {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}), + credentials); + + std::string columns_info; + readStringBinary(columns_info, buf); + NamesAndTypesList columns = NamesAndTypesList::parse(columns_info); + + return ColumnsDescription{columns}; +} + +StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + startBridgeIfNot(context); + auto columns = getActualTableStructure(context, is_insert_query); + auto result = std::make_shared<StorageXDBC>( + StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, ConstraintsDescription{}, String{}, context, helper); + result->startup(); + return result; +} + +void registerTableFunctionJDBC(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionJDBC>(); +} + +void registerTableFunctionODBC(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionODBC>(); +} +} diff --git a/contrib/clickhouse/src/TableFunctions/ITableFunctionXDBC.h b/contrib/clickhouse/src/TableFunctions/ITableFunctionXDBC.h new file mode 100644 index 0000000000..2e4a8894da --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/ITableFunctionXDBC.h @@ -0,0 +1,80 @@ +#pragma once + +#include <Storages/StorageXDBC.h> +#include <TableFunctions/ITableFunction.h> +#include <Poco/Util/AbstractConfiguration.h> +#include <BridgeHelper/XDBCBridgeHelper.h> + +#include "clickhouse_config.h" + +namespace DB +{ +/** + * Base class for table functions, that works over external bridge + * Xdbc (Xdbc connect string, table) - creates a temporary StorageXDBC. + */ +class ITableFunctionXDBC : public ITableFunction +{ +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + /* A factory method to create bridge helper, that will assist in remote interaction */ + virtual BridgeHelperPtr createBridgeHelper(ContextPtr context, + Poco::Timespan http_timeout_, + const std::string & connection_string_, + bool use_connection_pooling_) const = 0; + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + void startBridgeIfNot(ContextPtr context) const; + + String connection_string; + String schema_name; + String remote_table_name; + mutable BridgeHelperPtr helper; +}; + +class TableFunctionJDBC : public ITableFunctionXDBC +{ +public: + static constexpr auto name = "jdbc"; + std::string getName() const override + { + return name; + } + +private: + BridgeHelperPtr createBridgeHelper(ContextPtr context, + Poco::Timespan http_timeout_, + const std::string & connection_string_, + bool use_connection_pooling_) const override + { + return std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(context, http_timeout_, connection_string_, use_connection_pooling_); + } + + const char * getStorageTypeName() const override { return "JDBC"; } +}; + +class TableFunctionODBC : public ITableFunctionXDBC +{ +public: + static constexpr auto name = "odbc"; + std::string getName() const override + { + return name; + } + +private: + BridgeHelperPtr createBridgeHelper(ContextPtr context, + Poco::Timespan http_timeout_, + const std::string & connection_string_, + bool use_connection_pooling_) const override + { + return std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(context, http_timeout_, connection_string_, use_connection_pooling_); + } + + const char * getStorageTypeName() const override { return "ODBC"; } +}; +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorage.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorage.cpp new file mode 100644 index 0000000000..c2829fd905 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorage.cpp @@ -0,0 +1,317 @@ +#include "clickhouse_config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/Context.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionS3.h> +#include <TableFunctions/TableFunctionAzureBlobStorage.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Access/Common/AccessFlags.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/ASTIdentifier.h> +#include <Parsers/ASTFunction.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Storages/StorageAzureBlob.h> +#include <Storages/StorageURL.h> +#include <Storages/NamedCollectionsHelpers.h> +#include <Formats/FormatFactory.h> +#include "registerTableFunctions.h" +#error #include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h> +#include <DataTypes/DataTypeString.h> +#include <DataTypes/DataTypesNumber.h> + +#include <boost/algorithm/string.hpp> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + +namespace +{ + +bool isConnectionString(const std::string & candidate) +{ + return !candidate.starts_with("http"); +} + +} + +void TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context) +{ + /// Supported signatures: + /// + /// AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]) + /// + + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) + { + StorageAzureBlob::processNamedCollectionResult(configuration, *named_collection); + + configuration.blobs_paths = {configuration.blob_path}; + + if (configuration.format == "auto") + configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); + } + else + { + if (engine_args.size() < 3 || engine_args.size() > 8) + throw Exception( + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage Azure requires 3 to 7 arguments: " + "AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])"); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context); + + std::unordered_map<std::string_view, size_t> engine_args_to_idx; + + configuration.connection_url = checkAndGetLiteralArgument<String>(engine_args[0], "connection_string/storage_account_url"); + configuration.is_connection_string = isConnectionString(configuration.connection_url); + + configuration.container = checkAndGetLiteralArgument<String>(engine_args[1], "container"); + configuration.blob_path = checkAndGetLiteralArgument<String>(engine_args[2], "blobpath"); + + auto is_format_arg + = [](const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(s); }; + + if (engine_args.size() == 4) + { + auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name/structure"); + if (is_format_arg(fourth_arg)) + { + configuration.format = fourth_arg; + } + else + { + configuration.structure = fourth_arg; + } + } + else if (engine_args.size() == 5) + { + auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name"); + if (is_format_arg(fourth_arg)) + { + configuration.format = fourth_arg; + configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression"); + } + else + { + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key"); + } + } + else if (engine_args.size() == 6) + { + auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name"); + if (is_format_arg(fourth_arg)) + { + configuration.format = fourth_arg; + configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression"); + configuration.structure = checkAndGetLiteralArgument<String>(engine_args[5], "structure"); + } + else + { + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key"); + auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name/structure"); + if (is_format_arg(sixth_arg)) + configuration.format = sixth_arg; + else + configuration.structure = sixth_arg; + } + } + else if (engine_args.size() == 7) + { + auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name"); + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key"); + auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name"); + if (!is_format_arg(sixth_arg)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); + configuration.format = sixth_arg; + configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[6], "compression"); + } + else if (engine_args.size() == 8) + { + auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name"); + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key"); + auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name"); + if (!is_format_arg(sixth_arg)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); + configuration.format = sixth_arg; + configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[6], "compression"); + configuration.structure = checkAndGetLiteralArgument<String>(engine_args[7], "structure"); + } + + configuration.blobs_paths = {configuration.blob_path}; + + if (configuration.format == "auto") + configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); + } +} + +void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + /// Clone ast function, because we can modify its arguments like removing headers. + auto ast_copy = ast_function->clone(); + + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName()); + + auto & args = args_func.at(0)->children; + + parseArgumentsImpl(args, context); +} + +void TableFunctionAzureBlobStorage::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context) +{ + if (tryGetNamedCollectionWithOverrides(args, context)) + { + /// In case of named collection, just add key-value pair "structure='...'" + /// at the end of arguments to override existed structure. + ASTs equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure)}; + auto equal_func = makeASTFunction("equals", std::move(equal_func_args)); + args.push_back(equal_func); + } + else + { + if (args.size() < 3 || args.size() > 8) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage Azure requires 3 to 7 arguments: " + "AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])"); + + auto structure_literal = std::make_shared<ASTLiteral>(structure); + + auto is_format_arg + = [](const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(s); }; + + + if (args.size() == 3) + { + /// Add format=auto & compression=auto before structure argument. + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(structure_literal); + } + else if (args.size() == 4) + { + auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name/structure"); + if (is_format_arg(fourth_arg)) + { + /// Add compression=auto before structure argument. + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(structure_literal); + } + else + { + args.back() = structure_literal; + } + } + else if (args.size() == 5) + { + auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name"); + if (!is_format_arg(fourth_arg)) + { + /// Add format=auto & compression=auto before structure argument. + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(std::make_shared<ASTLiteral>("auto")); + } + args.push_back(structure_literal); + } + else if (args.size() == 6) + { + auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name"); + if (!is_format_arg(fourth_arg)) + { + /// Add compression=auto before structure argument. + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(structure_literal); + } + else + { + args.back() = structure_literal; + } + } + else if (args.size() == 7) + { + args.push_back(structure_literal); + } + else if (args.size() == 8) + { + args.back() = structure_literal; + } + } +} + +ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context, bool is_insert_query) const +{ + if (configuration.structure == "auto") + { + context->checkAccess(getSourceAccessType()); + auto client = StorageAzureBlob::createClient(configuration, !is_insert_query); + auto settings = StorageAzureBlob::createSettings(context); + + auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings)); + return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context, false); + } + + return parseColumnsListFromString(configuration.structure, context); +} + +bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns(const ContextPtr & context) +{ + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context); +} + +StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto client = StorageAzureBlob::createClient(configuration, !is_insert_query); + auto settings = StorageAzureBlob::createSettings(context); + + ColumnsDescription columns; + if (configuration.structure != "auto") + columns = parseColumnsListFromString(configuration.structure, context); + else if (!structure_hint.empty()) + columns = structure_hint; + + StoragePtr storage = std::make_shared<StorageAzureBlob>( + configuration, + std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)), + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + String{}, + /// No format_settings for table function Azure + std::nullopt, + /* distributed_processing */ false, + nullptr); + + storage->startup(); + + return storage; +} + +void registerTableFunctionAzureBlobStorage(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionAzureBlobStorage>( + {.documentation + = {.description=R"(The table function can be used to read the data stored on Azure Blob Storage.)", + .examples{{"azureBlobStorage", "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])", ""}}}, + .allow_readonly = false}); +} + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorage.h b/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorage.h new file mode 100644 index 0000000000..98ed603243 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorage.h @@ -0,0 +1,83 @@ +#pragma once + +#include "clickhouse_config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include <TableFunctions/ITableFunction.h> +#include <Storages/StorageAzureBlob.h> + + +namespace DB +{ + +class Context; + +/* AzureBlob(source, [access_key_id, secret_access_key,] [format, compression, structure]) - creates a temporary storage for a file in AzureBlob. + */ +class TableFunctionAzureBlobStorage : public ITableFunction +{ +public: + static constexpr auto name = "azureBlobStorage"; + + static constexpr auto signature = " - connection_string, container_name, blobpath\n" + " - connection_string, container_name, blobpath, structure \n" + " - connection_string, container_name, blobpath, format \n" + " - connection_string, container_name, blobpath, format, compression \n" + " - connection_string, container_name, blobpath, format, compression, structure \n" + " - storage_account_url, container_name, blobpath, account_name, account_key\n" + " - storage_account_url, container_name, blobpath, account_name, account_key, structure\n" + " - storage_account_url, container_name, blobpath, account_name, account_key, format\n" + " - storage_account_url, container_name, blobpath, account_name, account_key, format, compression\n" + " - storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure\n"; + + static size_t getMaxNumberOfArguments() { return 8; } + + String getName() const override + { + return name; + } + + virtual String getSignature() const + { + return signature; + } + + bool hasStaticStructure() const override { return configuration.structure != "auto"; } + + bool needStructureHint() const override { return configuration.structure == "auto"; } + + void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } + + bool supportsReadingSubsetOfColumns(const ContextPtr & context) override; + + std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override + { + return {"_path", "_file"}; + } + + virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context); + + static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context); + +protected: + + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "Azure"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + mutable StorageAzureBlob::Configuration configuration; + ColumnsDescription structure_hint; +}; + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorageCluster.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorageCluster.cpp new file mode 100644 index 0000000000..3926669c9d --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorageCluster.cpp @@ -0,0 +1,85 @@ +#include "clickhouse_config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include <TableFunctions/TableFunctionAzureBlobStorageCluster.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Storages/StorageAzureBlob.h> + +#include "registerTableFunctions.h" + +#include <memory> + + +namespace DB +{ + +StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl( + const ASTPtr & /*function*/, ContextPtr context, + const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + StoragePtr storage; + ColumnsDescription columns; + bool structure_argument_was_provided = configuration.structure != "auto"; + + if (structure_argument_was_provided) + { + columns = parseColumnsListFromString(configuration.structure, context); + } + else if (!structure_hint.empty()) + { + columns = structure_hint; + } + + auto client = StorageAzureBlob::createClient(configuration, !is_insert_query); + auto settings = StorageAzureBlob::createSettings(context); + + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + /// On worker node this filename won't contains globs + storage = std::make_shared<StorageAzureBlob>( + configuration, + std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)), + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + /* comment */String{}, + /* format_settings */std::nullopt, /// No format_settings + /* distributed_processing */ true, + /*partition_by_=*/nullptr); + } + else + { + storage = std::make_shared<StorageAzureBlobCluster>( + cluster_name, + configuration, + std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)), + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + context, + structure_argument_was_provided); + } + + storage->startup(); + + return storage; +} + + +void registerTableFunctionAzureBlobStorageCluster(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionAzureBlobStorageCluster>( + {.documentation + = {.description=R"(The table function can be used to read the data stored on Azure Blob Storage in parallel for many nodes in a specified cluster.)", + .examples{{"azureBlobStorageCluster", "SELECT * FROM azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])", ""}}}, + .allow_readonly = false} + ); +} + + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorageCluster.h b/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorageCluster.h new file mode 100644 index 0000000000..02ae7069b5 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionAzureBlobStorageCluster.h @@ -0,0 +1,55 @@ +#pragma once + +#include "clickhouse_config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionAzureBlobStorage.h> +#include <TableFunctions/ITableFunctionCluster.h> +#include <Storages/StorageAzureBlobCluster.h> + + +namespace DB +{ + +class Context; + +/** + * azureBlobStorageCluster(cluster_name, source, [access_key_id, secret_access_key,] format, compression_method, structure) + * A table function, which allows to process many files from Azure Blob Storage on a specific cluster + * On initiator it creates a connection to _all_ nodes in cluster, discloses asterisks + * in Azure Blob Storage file path and dispatch each file dynamically. + * On worker node it asks initiator about next task to process, processes it. + * This is repeated until the tasks are finished. + */ +class TableFunctionAzureBlobStorageCluster : public ITableFunctionCluster<TableFunctionAzureBlobStorage> +{ +public: + static constexpr auto name = "azureBlobStorageCluster"; + static constexpr auto signature = " - cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]"; + + String getName() const override + { + return name; + } + + String getSignature() const override + { + return signature; + } + +protected: + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "AzureBlobStorageCluster"; } +}; + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionDeltaLake.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionDeltaLake.cpp new file mode 100644 index 0000000000..08b851fb74 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionDeltaLake.cpp @@ -0,0 +1,33 @@ +#include "clickhouse_config.h" + +#if USE_AWS_S3 && USE_PARQUET + +#include <Storages/DataLakes/StorageDeltaLake.h> +#include <TableFunctions/ITableFunctionDataLake.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionS3.h> +#include "registerTableFunctions.h" + +namespace DB +{ + +struct TableFunctionDeltaLakeName +{ + static constexpr auto name = "deltaLake"; +}; + +using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLakeS3, TableFunctionS3>; + +void registerTableFunctionDeltaLake(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionDeltaLake>( + {.documentation = { + .description=R"(The table function can be used to read the DeltaLake table stored on object store.)", + .examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +} + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionDictionary.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionDictionary.cpp new file mode 100644 index 0000000000..f0060acb41 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionDictionary.cpp @@ -0,0 +1,95 @@ +#include <TableFunctions/TableFunctionDictionary.h> + +#include <Parsers/ASTLiteral.h> + +#include <DataTypes/DataTypeArray.h> +#include <DataTypes/DataTypeString.h> +#include <DataTypes/DataTypesNumber.h> + +#include <Interpreters/Context.h> +#include <Interpreters/ExternalDictionariesLoader.h> +#include <Interpreters/evaluateConstantExpression.h> + +#include <Storages/StorageDictionary.h> +#include <Storages/checkAndGetLiteralArgument.h> + +#include <TableFunctions/TableFunctionFactory.h> + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +void TableFunctionDictionary::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + // Parse args + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function ({}) must have arguments.", quoteString(getName())); + + ASTs & args = args_func.at(0)->children; + + if (args.size() != 1) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function ({}) requires 1 arguments", quoteString(getName())); + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + dictionary_name = checkAndGetLiteralArgument<String>(args[0], "dictionary_name"); +} + +ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + const ExternalDictionariesLoader & external_loader = context->getExternalDictionariesLoader(); + std::string resolved_name = external_loader.resolveDictionaryName(dictionary_name, context->getCurrentDatabase()); + auto load_result = external_loader.load(resolved_name); + if (load_result) + { + /// for regexp tree dictionary, the table structure will be different with dictionary structure. it is: + /// - id. identifier of the tree node + /// - parent_id. + /// - regexp. the regular expression + /// - keys. the names of attributions of dictionary structure + /// - values. the values of each attribution + const auto dictionary = std::static_pointer_cast<const IDictionary>(load_result); + if (dictionary->getTypeName() == "RegExpTree") + { + return ColumnsDescription(NamesAndTypesList({ + {"id", std::make_shared<DataTypeUInt64>()}, + {"parent_id", std::make_shared<DataTypeUInt64>()}, + {"regexp", std::make_shared<DataTypeString>()}, + {"keys", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, + {"values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())} + })); + } + } + + /// otherwise, we get table structure by dictionary structure. + auto dictionary_structure = external_loader.getDictionaryStructure(dictionary_name, context); + return ColumnsDescription(StorageDictionary::getNamesAndTypes(dictionary_structure)); + +} + +StoragePtr TableFunctionDictionary::executeImpl( + const ASTPtr &, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const +{ + StorageID dict_id(getDatabaseName(), table_name); + auto dictionary_table_structure = getActualTableStructure(context, is_insert_query); + + auto result = std::make_shared<StorageDictionary>( + dict_id, dictionary_name, std::move(dictionary_table_structure), String{}, StorageDictionary::Location::Custom, context); + + return result; +} + +void registerTableFunctionDictionary(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionDictionary>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionDictionary.h b/contrib/clickhouse/src/TableFunctions/TableFunctionDictionary.h new file mode 100644 index 0000000000..d0beb292fe --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionDictionary.h @@ -0,0 +1,30 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> + +namespace DB +{ +class Context; + +/// dictionary(dictionary_name) - creates a temporary storage from dictionary +class TableFunctionDictionary final : public ITableFunction +{ +public: + static constexpr auto name = "dictionary"; + std::string getName() const override + { + return name; + } + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "Dictionary"; } + +private: + String dictionary_name; + ColumnsDescription dictionary_columns; +};} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionExecutable.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionExecutable.cpp new file mode 100644 index 0000000000..5a64a98815 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionExecutable.cpp @@ -0,0 +1,148 @@ +#include <TableFunctions/TableFunctionExecutable.h> + +#include <Common/Exception.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Analyzer/TableFunctionNode.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTIdentifier.h> +#include <Parsers/ASTSelectWithUnionQuery.h> +#include <Parsers/ASTSetQuery.h> +#include <Parsers/ASTSubquery.h> +#include <Parsers/parseQuery.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Storages/StorageExecutable.h> +#include <DataTypes/DataTypeFactory.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/interpretSubquery.h> +#include <boost/algorithm/string.hpp> +#include "registerTableFunctions.h" + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + +std::vector<size_t> TableFunctionExecutable::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const +{ + const auto & table_function_node = query_node_table_function->as<TableFunctionNode &>(); + const auto & table_function_node_arguments = table_function_node.getArguments().getNodes(); + size_t table_function_node_arguments_size = table_function_node_arguments.size(); + + if (table_function_node_arguments_size <= 3) + return {}; + + std::vector<size_t> result_indexes; + result_indexes.reserve(table_function_node_arguments_size - 3); + for (size_t i = 3; i < table_function_node_arguments_size; ++i) + result_indexes.push_back(i); + + return result_indexes; +} + +void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto * function = ast_function->as<ASTFunction>(); + + if (!function->arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Table function '{}' must have arguments", + getName()); + + auto args = function->arguments->children; + + if (args.size() < 3) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function '{}' requires minimum 3 arguments: script_name, format, structure, [input_query...]", + getName()); + + auto check_argument = [&](size_t i, const std::string & argument_name) + { + if (!args[i]->as<ASTIdentifier>() && + !args[i]->as<ASTLiteral>() && + !args[i]->as<ASTQueryParameter>() && + !args[i]->as<ASTSubquery>()) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal type of argument '{}' for table function '{}': must be an identifier or string literal", + argument_name, getName()); + }; + + check_argument(0, "script_name"); + check_argument(1, "format"); + check_argument(2, "structure"); + + for (size_t i = 0; i <= 2; ++i) + args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context); + + auto script_name_with_arguments_value = checkAndGetLiteralArgument<String>(args[0], "script_name_with_arguments_value"); + + std::vector<String> script_name_with_arguments; + boost::split(script_name_with_arguments, script_name_with_arguments_value, [](char c){ return c == ' '; }); + + script_name = std::move(script_name_with_arguments[0]); + script_name_with_arguments.erase(script_name_with_arguments.begin()); + arguments = std::move(script_name_with_arguments); + format = checkAndGetLiteralArgument<String>(args[1], "format"); + structure = checkAndGetLiteralArgument<String>(args[2], "structure"); + + for (size_t i = 3; i < args.size(); ++i) + { + if (args[i]->as<ASTSetQuery>()) + { + settings_query = std::move(args[i]); + } + else + { + ASTPtr query; + if (!args[i]->children.empty()) + query = args[i]->children.at(0); + + if (query && query->as<ASTSelectWithUnionQuery>()) + { + input_queries.emplace_back(std::move(query)); + } + else + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Table function '{}' argument is invalid {}", + getName(), + args[i]->formatForErrorMessage()); + } + } + } +} + +ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + return parseColumnsListFromString(structure, context); +} + +StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto storage_id = StorageID(getDatabaseName(), table_name); + auto global_context = context->getGlobalContext(); + ExecutableSettings settings; + settings.script_name = script_name; + settings.script_arguments = arguments; + if (settings_query != nullptr) + settings.applyChanges(settings_query->as<ASTSetQuery>()->changes); + + auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context, is_insert_query), ConstraintsDescription{}); + storage->startup(); + return storage; +} + +void registerTableFunctionExecutable(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionExecutable>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionExecutable.h b/contrib/clickhouse/src/TableFunctions/TableFunctionExecutable.h new file mode 100644 index 0000000000..aa595312fe --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionExecutable.h @@ -0,0 +1,44 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> + +namespace DB +{ + +class Context; +class ASTSetQuery; + +/* executable(script_name_optional_arguments, format, structure, input_query) - creates a temporary storage from executable file + * + * + * The file must be in the clickhouse data directory. + * The relative path begins with the clickhouse data directory. + */ +class TableFunctionExecutable : public ITableFunction +{ +public: + static constexpr auto name = "executable"; + + std::string getName() const override { return name; } + + bool hasStaticStructure() const override { return true; } + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "Executable"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + String script_name; + std::vector<String> arguments; + String format; + String structure; + std::vector<ASTPtr> input_queries; + ASTPtr settings_query = nullptr; +}; +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionExplain.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionExplain.cpp new file mode 100644 index 0000000000..f127979d92 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionExplain.cpp @@ -0,0 +1,162 @@ +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTSelectWithUnionQuery.h> +#include <Parsers/ASTSetQuery.h> +#include <Parsers/ParserSetQuery.h> +#include <Parsers/parseQuery.h> +#include <Parsers/queryToString.h> +#include <Storages/StorageValues.h> +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionExplain.h> +#include <TableFunctions/registerTableFunctions.h> +#include <Processors/Executors/PullingPipelineExecutor.h> +#include <Analyzer/TableFunctionNode.h> +#include <Interpreters/InterpreterSetQuery.h> +#include <Interpreters/Context.h> + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; +} + +std::vector<size_t> TableFunctionExplain::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr /*context*/) const +{ + const auto & table_function_node = query_node_table_function->as<TableFunctionNode &>(); + const auto & table_function_node_arguments = table_function_node.getArguments().getNodes(); + size_t table_function_node_arguments_size = table_function_node_arguments.size(); + + if (table_function_node_arguments_size == 3) + return {2}; + + return {}; +} + +void TableFunctionExplain::parseArguments(const ASTPtr & ast_function, ContextPtr /*context*/) +{ + const auto * function = ast_function->as<ASTFunction>(); + if (!function || !function->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Table function '{}' cannot be called directly, use `SELECT * FROM (EXPLAIN ...)` syntax", getName()); + + size_t num_args = function->arguments->children.size(); + if (num_args != 2 && num_args != 3) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Table function '{}' requires 2 or 3 arguments, got {}", getName(), num_args); + + const auto & kind_arg = function->arguments->children[0]; + const auto * kind_literal = kind_arg->as<ASTLiteral>(); + if (!kind_literal || kind_literal->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Table function '{}' requires a String argument for EXPLAIN kind, got '{}'", + getName(), queryToString(kind_arg)); + + ASTExplainQuery::ExplainKind kind = ASTExplainQuery::fromString(kind_literal->value.get<String>()); + auto explain_query = std::make_shared<ASTExplainQuery>(kind); + + const auto * settings_arg = function->arguments->children[1]->as<ASTLiteral>(); + if (!settings_arg || settings_arg->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Table function '{}' requires a serialized string settings argument, got '{}'", + getName(), queryToString(function->arguments->children[1])); + + const auto & settings_str = settings_arg->value.get<String>(); + if (!settings_str.empty()) + { + constexpr UInt64 max_size = 4096; + constexpr UInt64 max_depth = 16; + + /// parse_only_internals_ = true - we don't want to parse `SET` keyword + ParserSetQuery settings_parser(/* parse_only_internals_ = */ true); + ASTPtr settings_ast = parseQuery(settings_parser, settings_str, max_size, max_depth); + explain_query->setSettings(std::move(settings_ast)); + } + + if (function->arguments->children.size() > 2) + { + const auto & query_arg = function->arguments->children[2]; + if (!query_arg->as<ASTSelectWithUnionQuery>()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Table function '{}' requires a EXPLAIN SELECT query argument, got EXPLAIN '{}'", + getName(), queryToString(query_arg)); + explain_query->setExplainedQuery(query_arg); + } + else if (kind != ASTExplainQuery::ExplainKind::CurrentTransaction) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' requires a query argument", getName()); + } + + query = std::move(explain_query); +} + +ColumnsDescription TableFunctionExplain::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + Block sample_block = getInterpreter(context).getSampleBlock(query->as<ASTExplainQuery>()->getKind()); + ColumnsDescription columns_description; + for (const auto & column : sample_block.getColumnsWithTypeAndName()) + columns_description.add(ColumnDescription(column.name, column.type)); + return columns_description; +} + +static Block executeMonoBlock(QueryPipeline & pipeline) +{ + if (!pipeline.pulling()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected pulling pipeline"); + + PullingPipelineExecutor pulling_executor(pipeline); + std::vector<Block> blocks; + while (true) + { + Block block; + if (pulling_executor.pull(block)) + blocks.push_back(std::move(block)); + else + break; + } + + if (blocks.size() == 1) + return blocks[0]; + + return concatenateBlocks(blocks); +} + +StoragePtr TableFunctionExplain::executeImpl( + const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + /// To support settings inside explain subquery. + auto mutable_context = Context::createCopy(context); + InterpreterSetQuery::applySettingsFromQuery(query, mutable_context); + BlockIO blockio = getInterpreter(mutable_context).execute(); + Block block = executeMonoBlock(blockio.pipeline); + + StorageID storage_id(getDatabaseName(), table_name); + auto storage = std::make_shared<StorageValues>(storage_id, getActualTableStructure(context, is_insert_query), std::move(block)); + storage->startup(); + return storage; +} + +InterpreterExplainQuery TableFunctionExplain::getInterpreter(ContextPtr context) const +{ + if (!query) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' requires a explain query argument", getName()); + + return InterpreterExplainQuery(query, context); +} + +void registerTableFunctionExplain(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionExplain>({.documentation = { + .description=R"( + Returns result of EXPLAIN query. + The function should not be called directly but can be invoked via `SELECT * FROM (EXPLAIN <query>)`. + You can use this query to process the result of EXPLAIN further using SQL (e.g., in tests). + Example: + [example:1] + )", + .examples={{"1", "SELECT explain FROM (EXPLAIN AST SELECT * FROM system.numbers) WHERE explain LIKE '%Asterisk%'", ""}} + }}); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionExplain.h b/contrib/clickhouse/src/TableFunctions/TableFunctionExplain.h new file mode 100644 index 0000000000..2eb7e35d0b --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionExplain.h @@ -0,0 +1,35 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> +#include <Parsers/ASTExplainQuery.h> +#include <Interpreters/InterpreterExplainQuery.h> +#include <base/types.h> + + +namespace DB +{ + +class TableFunctionExplain : public ITableFunction +{ +public: + static constexpr auto name = "viewExplain"; + + std::string getName() const override { return name; } + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "Explain"; } + + std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + InterpreterExplainQuery getInterpreter(ContextPtr context) const; + + ASTPtr query = nullptr; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionFactory.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionFactory.cpp new file mode 100644 index 0000000000..ce3daff078 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionFactory.cpp @@ -0,0 +1,121 @@ +#include <TableFunctions/TableFunctionFactory.h> + +#include <Interpreters/Context.h> +#include <Common/CurrentThread.h> +#include <Common/Exception.h> +#include <Common/KnownObjectNames.h> +#include <IO/WriteHelpers.h> +#include <Parsers/ASTFunction.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_FUNCTION; + extern const int LOGICAL_ERROR; +} + +void TableFunctionFactory::registerFunction( + const std::string & name, Value value, CaseSensitiveness case_sensitiveness) +{ + if (!table_functions.emplace(name, value).second) + throw Exception(ErrorCodes::LOGICAL_ERROR, "TableFunctionFactory: the table function name '{}' is not unique", name); + + if (case_sensitiveness == CaseInsensitive + && !case_insensitive_table_functions.emplace(Poco::toLower(name), value).second) + throw Exception(ErrorCodes::LOGICAL_ERROR, "TableFunctionFactory: " + "the case insensitive table function name '{}' is not unique", name); + + KnownTableFunctionNames::instance().add(name, (case_sensitiveness == CaseInsensitive)); +} + +TableFunctionPtr TableFunctionFactory::get( + const ASTPtr & ast_function, + ContextPtr context) const +{ + const auto * table_function = ast_function->as<ASTFunction>(); + auto res = tryGet(table_function->name, context); + if (!res) + { + auto hints = getHints(table_function->name); + if (!hints.empty()) + throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}. Maybe you meant: {}", table_function->name, toString(hints)); + else + throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}", table_function->name); + } + + res->parseArguments(ast_function, context); + return res; +} + +TableFunctionPtr TableFunctionFactory::tryGet( + const std::string & name_param, + ContextPtr) const +{ + String name = getAliasToOrName(name_param); + TableFunctionPtr res; + + auto it = table_functions.find(name); + if (table_functions.end() != it) + { + res = it->second.creator(); + } + else + { + it = case_insensitive_table_functions.find(Poco::toLower(name)); + if (case_insensitive_table_functions.end() != it) + res = it->second.creator(); + } + + if (!res) + return nullptr; + + if (CurrentThread::isInitialized()) + { + auto query_context = CurrentThread::get().getQueryContext(); + if (query_context && query_context->getSettingsRef().log_queries) + query_context->addQueryFactoriesInfo(Context::QueryLogFactories::TableFunction, name); + } + + return res; +} + +bool TableFunctionFactory::isTableFunctionName(const std::string & name) const +{ + return table_functions.contains(name); +} + +std::optional<TableFunctionProperties> TableFunctionFactory::tryGetProperties(const String & name) const +{ + return tryGetPropertiesImpl(name); +} + +std::optional<TableFunctionProperties> TableFunctionFactory::tryGetPropertiesImpl(const String & name_param) const +{ + String name = getAliasToOrName(name_param); + Value found; + + /// Find by exact match. + if (auto it = table_functions.find(name); it != table_functions.end()) + { + found = it->second; + } + + if (auto jt = case_insensitive_table_functions.find(Poco::toLower(name)); jt != case_insensitive_table_functions.end()) + found = jt->second; + + if (found.creator) + return found.properties; + + return {}; +} + +TableFunctionFactory & TableFunctionFactory::instance() +{ + static TableFunctionFactory ret; + return ret; +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionFactory.h b/contrib/clickhouse/src/TableFunctions/TableFunctionFactory.h new file mode 100644 index 0000000000..2cc648ba18 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionFactory.h @@ -0,0 +1,87 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> +#include <Common/IFactoryWithAliases.h> +#include <Common/NamePrompter.h> + +#include <functional> +#include <memory> +#include <string> +#include <unordered_map> +#include <boost/noncopyable.hpp> + + +namespace DB +{ + +class Context; + +using TableFunctionCreator = std::function<TableFunctionPtr()>; + +struct TableFunctionFactoryData +{ + TableFunctionCreator creator; + TableFunctionProperties properties; + + TableFunctionFactoryData() = default; + TableFunctionFactoryData(const TableFunctionFactoryData &) = default; + TableFunctionFactoryData & operator = (const TableFunctionFactoryData &) = default; + + template <typename Creator> + requires (!std::is_same_v<Creator, TableFunctionFactoryData>) + TableFunctionFactoryData(Creator creator_, TableFunctionProperties properties_ = {}) /// NOLINT + : creator(std::forward<Creator>(creator_)), properties(std::move(properties_)) + { + } +}; + + +/** Lets you get a table function by its name. + */ +class TableFunctionFactory final: private boost::noncopyable, public IFactoryWithAliases<TableFunctionFactoryData> +{ +public: + static TableFunctionFactory & instance(); + + /// Register a function by its name. + /// No locking, you must register all functions before usage of get. + void registerFunction( + const std::string & name, + Value value, + CaseSensitiveness case_sensitiveness = CaseSensitive); + + template <typename Function> + void registerFunction(TableFunctionProperties properties = {}, CaseSensitiveness case_sensitiveness = CaseSensitive) + { + auto creator = []() -> TableFunctionPtr { return std::make_shared<Function>(); }; + registerFunction(Function::name, + TableFunctionFactoryData{std::move(creator), {std::move(properties)}} , + case_sensitiveness); + } + + /// Throws an exception if not found. + TableFunctionPtr get(const ASTPtr & ast_function, ContextPtr context) const; + + /// Returns nullptr if not found. + TableFunctionPtr tryGet(const std::string & name, ContextPtr context) const; + + std::optional<TableFunctionProperties> tryGetProperties(const String & name) const; + + bool isTableFunctionName(const std::string & name) const; + +private: + using TableFunctions = std::unordered_map<std::string, Value>; + + const TableFunctions & getMap() const override { return table_functions; } + + const TableFunctions & getCaseInsensitiveMap() const override { return case_insensitive_table_functions; } + + String getFactoryName() const override { return "TableFunctionFactory"; } + + std::optional<TableFunctionProperties> tryGetPropertiesImpl(const String & name) const; + + TableFunctions table_functions; + TableFunctions case_insensitive_table_functions; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionFile.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionFile.cpp new file mode 100644 index 0000000000..f75c56e652 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionFile.cpp @@ -0,0 +1,119 @@ +#include <TableFunctions/TableFunctionFile.h> +#include <Interpreters/parseColumnsListForTableFunction.h> + +#include "Parsers/IAST_fwd.h" +#include "registerTableFunctions.h" +#include <Access/Common/AccessFlags.h> +#include <Interpreters/Context.h> +#include <Storages/ColumnsDescription.h> +#include <Storages/StorageFile.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Formats/FormatFactory.h> +#include <Parsers/ASTIdentifier_fwd.h> + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +void TableFunctionFile::parseFirstArguments(const ASTPtr & arg, const ContextPtr & context) +{ + if (context->getApplicationType() != Context::ApplicationType::LOCAL) + { + ITableFunctionFileLike::parseFirstArguments(arg, context); + StorageFile::parseFileSource(std::move(filename), filename, path_to_archive); + return; + } + + const auto * literal = arg->as<ASTLiteral>(); + auto type = literal->value.getType(); + if (type == Field::Types::String) + { + filename = literal->value.safeGet<String>(); + if (filename == "stdin" || filename == "-") + fd = STDIN_FILENO; + else if (filename == "stdout") + fd = STDOUT_FILENO; + else if (filename == "stderr") + fd = STDERR_FILENO; + else + StorageFile::parseFileSource(std::move(filename), filename, path_to_archive); + } + else if (type == Field::Types::Int64 || type == Field::Types::UInt64) + { + fd = static_cast<int>( + (type == Field::Types::Int64) ? literal->value.get<Int64>() : literal->value.get<UInt64>()); + if (fd < 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "File descriptor must be non-negative"); + } + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The first argument of table function '{}' mush be path or file descriptor", getName()); +} + +String TableFunctionFile::getFormatFromFirstArgument() +{ + if (fd >= 0) + return FormatFactory::instance().getFormatFromFileDescriptor(fd); + else + return FormatFactory::instance().getFormatFromFileName(filename, true); +} + +StoragePtr TableFunctionFile::getStorage(const String & source, + const String & format_, const ColumnsDescription & columns, + ContextPtr global_context, const std::string & table_name, + const std::string & compression_method_) const +{ + // For `file` table function, we are going to use format settings from the + // query context. + StorageFile::CommonArguments args{ + WithContext(global_context), + StorageID(getDatabaseName(), table_name), + format_, + std::nullopt /*format settings*/, + compression_method_, + columns, + ConstraintsDescription{}, + String{}, + global_context->getSettingsRef().rename_files_after_processing, + path_to_archive, + }; + + if (fd >= 0) + return std::make_shared<StorageFile>(fd, args); + + return std::make_shared<StorageFile>(source, global_context->getUserFilesPath(), args); +} + +ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + if (structure == "auto") + { + if (fd >= 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema inference is not supported for table function '{}' with file descriptor", getName()); + size_t total_bytes_to_read = 0; + + Strings paths; + std::optional<StorageFile::ArchiveInfo> archive_info; + if (path_to_archive.empty()) + paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context, total_bytes_to_read); + else + archive_info + = StorageFile::getArchiveInfo(path_to_archive, filename, context->getUserFilesPath(), context, total_bytes_to_read); + + return StorageFile::getTableStructureFromFile(format, paths, compression_method, std::nullopt, context, archive_info); + } + + + return parseColumnsListFromString(structure, context); +} + +void registerTableFunctionFile(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionFile>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionFile.h b/contrib/clickhouse/src/TableFunctions/TableFunctionFile.h new file mode 100644 index 0000000000..439ae87b4a --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionFile.h @@ -0,0 +1,42 @@ +#pragma once + +#include <TableFunctions/ITableFunctionFileLike.h> + + +namespace DB +{ + +/* file(path, format[, structure, compression]) - creates a temporary storage from file + * + * The file must be in the clickhouse data directory. + * The relative path begins with the clickhouse data directory. + */ +class TableFunctionFile : public ITableFunctionFileLike +{ +public: + static constexpr auto name = "file"; + std::string getName() const override + { + return name; + } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override + { + return {"_path", "_file"}; + } + +protected: + int fd = -1; + void parseFirstArguments(const ASTPtr & arg, const ContextPtr & context) override; + String getFormatFromFirstArgument() override; + +private: + StoragePtr getStorage( + const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, + const std::string & table_name, const std::string & compression_method_) const override; + const char * getStorageTypeName() const override { return "File"; } +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionFormat.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionFormat.cpp new file mode 100644 index 0000000000..3afe7ffde5 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionFormat.cpp @@ -0,0 +1,175 @@ +#include <Formats/ReadSchemaUtils.h> + +#include <IO/ReadBufferFromString.h> + +#include <Interpreters/Context.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/parseColumnsListForTableFunction.h> + +#include <Parsers/ASTLiteral.h> + +#include <Processors/Executors/PullingPipelineExecutor.h> +#include <Processors/Formats/IInputFormat.h> +#include <Processors/Transforms/AddingDefaultsTransform.h> + +#include <QueryPipeline/Pipe.h> +#include <QueryPipeline/QueryPipelineBuilder.h> + +#include <Storages/StorageValues.h> +#include <Storages/checkAndGetLiteralArgument.h> + +#include <TableFunctions/TableFunctionFormat.h> +#include <TableFunctions/TableFunctionFactory.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments", getName()); + + ASTs & args = args_func.at(0)->children; + + if (args.size() != 2 && args.size() != 3) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 2 or 3 arguments: format, [structure], data", getName()); + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + format = checkAndGetLiteralArgument<String>(args[0], "format"); + data = checkAndGetLiteralArgument<String>(args.back(), "data"); + if (args.size() == 3) + structure = checkAndGetLiteralArgument<String>(args[1], "structure"); +} + +ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + if (structure == "auto") + { + SingleReadBufferIterator read_buffer_iterator(std::make_unique<ReadBufferFromString>(data)); + return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, false, context); + } + return parseColumnsListFromString(structure, context); +} + +Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr context) const +{ + Block block; + for (const auto & name_and_type : columns.getAllPhysical()) + block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name}); + + auto read_buf = std::make_unique<ReadBufferFromString>(data); + auto input_format = context->getInputFormat(format, *read_buf, block, context->getSettingsRef().max_block_size); + QueryPipelineBuilder builder; + builder.init(Pipe(input_format)); + if (columns.hasDefaults()) + { + builder.addSimpleTransform([&](const Block & header) + { + return std::make_shared<AddingDefaultsTransform>(header, columns, *input_format, context); + }); + } + + auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder))); + auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline); + + std::vector<Block> blocks; + while (reader->pull(block)) + blocks.push_back(std::move(block)); + + if (blocks.size() == 1) + return blocks[0]; + + /// In case when data contains more then 1 block we combine + /// them all to one big block (this is considered a rare case). + return concatenateBlocks(blocks); +} + +StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto columns = getActualTableStructure(context, is_insert_query); + Block res_block = parseData(columns, context); + auto res = std::make_shared<StorageValues>(StorageID(getDatabaseName(), table_name), columns, res_block); + res->startup(); + return res; +} + +static const FunctionDocumentation format_table_function_documentation = +{ + .description=R"( +Extracts table structure from data and parses it according to specified input format. +Syntax: `format(format_name, data)`. +Parameters: + - `format_name` - the format of the data. + - `data ` - String literal or constant expression that returns a string containing data in specified format. +Returned value: A table with data parsed from `data` argument according specified format and extracted schema. +)", + .examples + { + { + "First example", + R"( +Query: +``` +:) select * from format(JSONEachRow, +$$ +{"a": "Hello", "b": 111} +{"a": "World", "b": 123} +{"a": "Hello", "b": 112} +{"a": "World", "b": 124} +$$) +``` + +Result: +``` +┌───b─┬─a─────┐ +│ 111 │ Hello │ +│ 123 │ World │ +│ 112 │ Hello │ +│ 124 │ World │ +└─────┴───────┘ +``` +)", "" + }, + { + "Second example", + R"( +Query: +``` +:) desc format(JSONEachRow, +$$ +{"a": "Hello", "b": 111} +{"a": "World", "b": 123} +{"a": "Hello", "b": 112} +{"a": "World", "b": 124} +$$) +``` + +Result: +``` +┌─name─┬─type──────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐ +│ b │ Nullable(Float64) │ │ │ │ │ │ +│ a │ Nullable(String) │ │ │ │ │ │ +└──────┴───────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘ +``` +)", "" + }, + }, + .categories{"format", "table-functions"} +}; + +void registerTableFunctionFormat(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionFormat>({format_table_function_documentation, false}, TableFunctionFactory::CaseInsensitive); +} +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionFormat.h b/contrib/clickhouse/src/TableFunctions/TableFunctionFormat.h new file mode 100644 index 0000000000..e20e8b6ea4 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionFormat.h @@ -0,0 +1,34 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> + + +namespace DB +{ + +class Context; + +/* format(format_name, data) - ... + */ +class TableFunctionFormat : public ITableFunction +{ +public: + static constexpr auto name = "format"; + std::string getName() const override { return name; } + bool hasStaticStructure() const override { return false; } + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "Values"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + Block parseData(ColumnsDescription columns, ContextPtr context) const; + + String format; + String data; + String structure = "auto"; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionGenerateRandom.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionGenerateRandom.cpp new file mode 100644 index 0000000000..c6a9154cc6 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionGenerateRandom.cpp @@ -0,0 +1,132 @@ +#include <Common/Exception.h> + +#include <Storages/StorageGenerateRandom.h> +#include <Storages/checkAndGetLiteralArgument.h> + +#include <Parsers/ASTExpressionList.h> +#include <Parsers/ASTLiteral.h> + +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionGenerateRandom.h> +#include <Functions/FunctionGenerateRandomStructure.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Interpreters/evaluateConstantExpression.h> + +#include <Common/randomSeed.h> + +#include "registerTableFunctions.h" + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int LOGICAL_ERROR; +} + +void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments.", getName()); + + ASTs & args = args_func.at(0)->children; + + if (args.empty()) + return; + + /// First, check if first argument is structure or seed. + const auto * first_arg_literal = args[0]->as<const ASTLiteral>(); + bool first_argument_is_structure = !first_arg_literal || first_arg_literal->value.getType() == Field::Types::String; + size_t max_args = first_argument_is_structure ? 4 : 3; + + if (args.size() > max_args) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function '{}' requires at most four (or three if structure is missing) arguments: " + " [structure, random_seed, max_string_length, max_array_length].", getName()); + + if (first_argument_is_structure) + { + /// Allow constant expression for structure argument, it can be generated using generateRandomStructure function. + args[0] = evaluateConstantExpressionAsLiteral(args[0], context); + } + + // All the arguments must be literals. + for (const auto & arg : args) + { + if (!arg->as<const ASTLiteral>()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "All arguments of table function '{}' except structure argument must be literals. " + "Got '{}' instead", getName(), arg->formatForErrorMessage()); + } + } + + size_t arg_index = 0; + + if (first_argument_is_structure) + { + /// Parsing first argument as table structure and creating a sample block + structure = checkAndGetLiteralArgument<String>(args[arg_index], "structure"); + ++arg_index; + } + + if (args.size() >= arg_index + 1) + { + const auto & literal = args[arg_index]->as<const ASTLiteral &>(); + ++arg_index; + if (!literal.value.isNull()) + random_seed = checkAndGetLiteralArgument<UInt64>(literal, "random_seed"); + } + + if (args.size() >= arg_index + 1) + { + max_string_length = checkAndGetLiteralArgument<UInt64>(args[arg_index], "max_string_length"); + ++arg_index; + } + + if (args.size() == arg_index + 1) + { + max_array_length = checkAndGetLiteralArgument<UInt64>(args[arg_index], "max_string_length"); + ++arg_index; + } +} + +ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + if (structure == "auto") + { + if (structure_hint.empty()) + { + auto random_structure = FunctionGenerateRandomStructure::generateRandomStructure(random_seed.value_or(randomSeed()), context); + return parseColumnsListFromString(random_structure, context); + } + + return structure_hint; + } + + return parseColumnsListFromString(structure, context); +} + +StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + ColumnsDescription columns = getActualTableStructure(context, is_insert_query); + auto res = std::make_shared<StorageGenerateRandom>( + StorageID(getDatabaseName(), table_name), columns, String{}, max_array_length, max_string_length, random_seed); + res->startup(); + return res; +} + +void registerTableFunctionGenerate(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionGenerateRandom>({.documentation = {}, .allow_readonly = true}); +} + +} + + diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionGenerateRandom.h b/contrib/clickhouse/src/TableFunctions/TableFunctionGenerateRandom.h new file mode 100644 index 0000000000..a5d11ce0af --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionGenerateRandom.h @@ -0,0 +1,36 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> + +namespace DB +{ + +/* generateRandom([structure, max_array_length, max_string_length, random_seed]) + * - creates a temporary storage that generates columns with random data + */ +class TableFunctionGenerateRandom : public ITableFunction +{ +public: + static constexpr auto name = "generateRandom"; + std::string getName() const override { return name; } + bool hasStaticStructure() const override { return structure != "auto"; } + + bool needStructureHint() const override { return structure == "auto"; } + void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "GenerateRandom"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + String structure = "auto"; + UInt64 max_string_length = 10; + UInt64 max_array_length = 10; + std::optional<UInt64> random_seed; + ColumnsDescription structure_hint; +}; + + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionHDFS.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionHDFS.cpp new file mode 100644 index 0000000000..050559b16f --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionHDFS.cpp @@ -0,0 +1,48 @@ +#include "clickhouse_config.h" +#include "registerTableFunctions.h" + +#if USE_HDFS +#include <Storages/HDFS/StorageHDFS.h> +#include <Storages/ColumnsDescription.h> +#include <TableFunctions/TableFunctionFactory.h> +#error #include <TableFunctions/TableFunctionHDFS.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Interpreters/Context.h> +#include <Access/Common/AccessFlags.h> + +namespace DB +{ + +StoragePtr TableFunctionHDFS::getStorage( + const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, + const std::string & table_name, const String & compression_method_) const +{ + return std::make_shared<StorageHDFS>( + source, + StorageID(getDatabaseName(), table_name), + format_, + columns, + ConstraintsDescription{}, + String{}, + global_context, + compression_method_); +} + +ColumnsDescription TableFunctionHDFS::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + if (structure == "auto") + { + context->checkAccess(getSourceAccessType()); + return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context); + } + + return parseColumnsListFromString(structure, context); +} + +void registerTableFunctionHDFS(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionHDFS>(); +} + +} +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionHDFSCluster.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionHDFSCluster.cpp new file mode 100644 index 0000000000..0b531d9132 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionHDFSCluster.cpp @@ -0,0 +1,61 @@ +#include "clickhouse_config.h" + +#if USE_HDFS + +#include <TableFunctions/TableFunctionHDFSCluster.h> +#include <TableFunctions/TableFunctionFactory.h> + +#include <Storages/HDFS/StorageHDFSCluster.h> +#include <Storages/HDFS/StorageHDFS.h> +#include "registerTableFunctions.h" + +#include <memory> + + +namespace DB +{ + +StoragePtr TableFunctionHDFSCluster::getStorage( + const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context, + const std::string & table_name, const String & /*compression_method_*/) const +{ + StoragePtr storage; + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + /// On worker node this uri won't contains globs + storage = std::make_shared<StorageHDFS>( + filename, + StorageID(getDatabaseName(), table_name), + format, + columns, + ConstraintsDescription{}, + String{}, + context, + compression_method, + /*distributed_processing=*/true, + nullptr); + } + else + { + storage = std::make_shared<StorageHDFSCluster>( + context, + cluster_name, + filename, + StorageID(getDatabaseName(), table_name), + format, + columns, + ConstraintsDescription{}, + compression_method, + structure != "auto"); + } + return storage; +} + +void registerTableFunctionHDFSCluster(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionHDFSCluster>(); +} + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionHDFSCluster.h b/contrib/clickhouse/src/TableFunctions/TableFunctionHDFSCluster.h new file mode 100644 index 0000000000..0bd2d05f0f --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionHDFSCluster.h @@ -0,0 +1,54 @@ +#pragma once + +#include "clickhouse_config.h" + +#if USE_HDFS + +#include <TableFunctions/ITableFunctionFileLike.h> +#error #include <TableFunctions/TableFunctionHDFS.h> +#include <TableFunctions/ITableFunctionCluster.h> + + +namespace DB +{ + +class Context; + +/** + * hdfsCluster(cluster, URI, format, structure, compression_method) + * A table function, which allows to process many files from HDFS on a specific cluster + * On initiator it creates a connection to _all_ nodes in cluster, discloses asterisks + * in HDFS file path and dispatch each file dynamically. + * On worker node it asks initiator about next task to process, processes it. + * This is repeated until the tasks are finished. + */ +class TableFunctionHDFSCluster : public ITableFunctionCluster<TableFunctionHDFS> +{ +public: + static constexpr auto name = "hdfsCluster"; + static constexpr auto signature = " - cluster_name, uri\n" + " - cluster_name, uri, format\n" + " - cluster_name, uri, format, structure\n" + " - cluster_name, uri, format, structure, compression_method\n"; + + String getName() const override + { + return name; + } + + String getSignature() const override + { + return signature; + } + +protected: + StoragePtr getStorage( + const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, + const std::string & table_name, const String & compression_method_) const override; + + const char * getStorageTypeName() const override { return "HDFSCluster"; } +}; + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionHudi.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionHudi.cpp new file mode 100644 index 0000000000..f692efe977 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionHudi.cpp @@ -0,0 +1,31 @@ +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include <Storages/DataLakes/StorageHudi.h> +#include <TableFunctions/ITableFunctionDataLake.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionS3.h> +#include "registerTableFunctions.h" + +namespace DB +{ + +struct TableFunctionHudiName +{ + static constexpr auto name = "hudi"; +}; +using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudiS3, TableFunctionS3>; + +void registerTableFunctionHudi(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionHudi>( + {.documentation + = {.description=R"(The table function can be used to read the Hudi table stored on object store.)", + .examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +} +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionIceberg.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionIceberg.cpp new file mode 100644 index 0000000000..eba19680c7 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionIceberg.cpp @@ -0,0 +1,34 @@ +#include "clickhouse_config.h" + +#if USE_AWS_S3 && USE_AVRO + +#include <Storages/DataLakes/StorageIceberg.h> +#include <TableFunctions/ITableFunctionDataLake.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionS3.h> +#include "registerTableFunctions.h" + + +namespace DB +{ + +struct TableFunctionIcebergName +{ + static constexpr auto name = "iceberg"; +}; + +using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIcebergS3, TableFunctionS3>; + +void registerTableFunctionIceberg(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionIceberg>( + {.documentation + = {.description=R"(The table function can be used to read the Iceberg table stored on object store.)", + .examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +} + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionInput.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionInput.cpp new file mode 100644 index 0000000000..658a55c6fc --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionInput.cpp @@ -0,0 +1,73 @@ +#include <TableFunctions/TableFunctionInput.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTLiteral.h> +#include <Common/Exception.h> +#include <Storages/StorageInput.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <DataTypes/DataTypeFactory.h> +#include <Interpreters/evaluateConstantExpression.h> +#include "registerTableFunctions.h" + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; +} + +void TableFunctionInput::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto * function = ast_function->as<ASTFunction>(); + + if (!function->arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments", getName()); + + auto args = function->arguments->children; + + if (args.empty()) + { + structure = "auto"; + return; + } + + if (args.size() != 1) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function '{}' requires exactly 1 argument: structure", getName()); + + structure = checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context), "structure"); +} + +ColumnsDescription TableFunctionInput::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + if (structure == "auto") + { + if (structure_hint.empty()) + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "Table function '{}' was used without structure argument but structure could not be determined automatically. Please, " + "provide structure manually", + getName()); + return structure_hint; + } + return parseColumnsListFromString(structure, context); +} + +StoragePtr TableFunctionInput::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto storage = std::make_shared<StorageInput>(StorageID(getDatabaseName(), table_name), getActualTableStructure(context, is_insert_query)); + storage->startup(); + return storage; +} + +void registerTableFunctionInput(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionInput>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionInput.h b/contrib/clickhouse/src/TableFunctions/TableFunctionInput.h new file mode 100644 index 0000000000..3164ce43ee --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionInput.h @@ -0,0 +1,33 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> + + +namespace DB +{ + +class Context; + +/* input(structure) - allows to make INSERT SELECT from incoming stream of data + */ +class TableFunctionInput : public ITableFunction +{ +public: + static constexpr auto name = "input"; + std::string getName() const override { return name; } + bool hasStaticStructure() const override { return true; } + bool needStructureHint() const override { return true; } + void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "Input"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + String structure; + ColumnsDescription structure_hint; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionMeiliSearch.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionMeiliSearch.cpp new file mode 100644 index 0000000000..01840a8026 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionMeiliSearch.cpp @@ -0,0 +1,34 @@ +#include <memory> +#include <Parsers/ASTFunction.h> +#include <Storages/MeiliSearch/StorageMeiliSearch.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionMeiliSearch.h> +#include <Common/Exception.h> + +namespace DB +{ +StoragePtr TableFunctionMeiliSearch::executeImpl( + const ASTPtr & /* ast_function */, ContextPtr /*context*/, const String & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const +{ + return std::make_shared<StorageMeiliSearch>( + StorageID(getDatabaseName(), table_name), configuration.value(), ColumnsDescription{}, ConstraintsDescription{}, String{}); +} + +ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */, bool /*is_insert_query*/) const +{ + return StorageMeiliSearch::getTableStructureFromData(configuration.value()); +} + + +void TableFunctionMeiliSearch::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto & func_args = ast_function->as<ASTFunction &>(); + configuration = StorageMeiliSearch::getConfiguration(func_args.arguments->children, context); +} + +void registerTableFunctionMeiliSearch(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionMeiliSearch>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionMeiliSearch.h b/contrib/clickhouse/src/TableFunctions/TableFunctionMeiliSearch.h new file mode 100644 index 0000000000..a127809a9c --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionMeiliSearch.h @@ -0,0 +1,26 @@ +#pragma once +#include <Storages/MeiliSearch/MeiliSearchConnection.h> +#include <TableFunctions/ITableFunction.h> + +namespace DB +{ + +class TableFunctionMeiliSearch : public ITableFunction +{ +public: + static constexpr auto name = "meilisearch"; + String getName() const override { return name; } + +private: + StoragePtr executeImpl( + const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "meilisearch"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + std::optional<MeiliSearchConfiguration> configuration; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionMerge.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionMerge.cpp new file mode 100644 index 0000000000..599953a1ad --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionMerge.cpp @@ -0,0 +1,187 @@ +#include <Common/OptimizedRegularExpression.h> +#include <Common/typeid_cast.h> +#include <Storages/StorageMerge.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/ASTFunction.h> +#include <TableFunctions/ITableFunction.h> +#include <Analyzer/FunctionNode.h> +#include <Analyzer/TableFunctionNode.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/Context.h> +#include <Access/ContextAccess.h> +#include <TableFunctions/TableFunctionMerge.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/registerTableFunctions.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + +namespace +{ + [[noreturn]] void throwNoTablesMatchRegexp(const String & source_database_regexp, const String & source_table_regexp) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Error while executing table function merge. Either there is no database, which matches regular expression `{}`, or there are " + "no tables in database matches `{}`, which fit tables expression: {}", + source_database_regexp, + source_database_regexp, + source_table_regexp); + } +} + +std::vector<size_t> TableFunctionMerge::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const +{ + auto & table_function_node = query_node_table_function->as<TableFunctionNode &>(); + auto & table_function_arguments_nodes = table_function_node.getArguments().getNodes(); + size_t table_function_arguments_size = table_function_arguments_nodes.size(); + + std::vector<size_t> result; + + for (size_t i = 0; i < table_function_arguments_size; ++i) + { + auto * function_node = table_function_arguments_nodes[i]->as<FunctionNode>(); + if (function_node && function_node->getFunctionName() == "REGEXP") + result.push_back(i); + } + + return result; +} + +void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function 'merge' requires exactly 2 arguments - name " + "of source database and regexp for table names."); + + ASTs & args = args_func.at(0)->children; + + if (args.size() != 2) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function 'merge' requires exactly 2 arguments - name " + "of source database and regexp for table names."); + + auto [is_regexp, database_ast] = StorageMerge::evaluateDatabaseName(args[0], context); + + database_is_regexp = is_regexp; + + if (!is_regexp) + args[0] = database_ast; + source_database_name_or_regexp = checkAndGetLiteralArgument<String>(database_ast, "database_name"); + + args[1] = evaluateConstantExpressionAsLiteral(args[1], context); + source_table_regexp = checkAndGetLiteralArgument<String>(args[1], "table_name_regexp"); +} + + +const TableFunctionMerge::DBToTableSetMap & TableFunctionMerge::getSourceDatabasesAndTables(ContextPtr context) const +{ + if (source_databases_and_tables) + return *source_databases_and_tables; + + source_databases_and_tables.emplace(); + + /// database_name is not a regexp + if (!database_is_regexp) + { + auto source_tables = getMatchedTablesWithAccess(source_database_name_or_regexp, source_table_regexp, context); + if (source_tables.empty()) + throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp); + (*source_databases_and_tables)[source_database_name_or_regexp] = source_tables; + } + + /// database_name is a regexp + else + { + OptimizedRegularExpression database_re(source_database_name_or_regexp); + auto databases = DatabaseCatalog::instance().getDatabases(); + + for (const auto & db : databases) + if (database_re.match(db.first)) + (*source_databases_and_tables)[db.first] = getMatchedTablesWithAccess(db.first, source_table_regexp, context); + + if (source_databases_and_tables->empty()) + throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp); + } + + return *source_databases_and_tables; +} + +ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + for (const auto & db_with_tables : getSourceDatabasesAndTables(context)) + { + for (const auto & table : db_with_tables.second) + { + auto storage = DatabaseCatalog::instance().tryGetTable(StorageID{db_with_tables.first, table}, context); + if (storage) + return ColumnsDescription{storage->getInMemoryMetadataPtr()->getColumns().getAllPhysical()}; + } + } + + throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp); +} + + +StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto res = std::make_shared<StorageMerge>( + StorageID(getDatabaseName(), table_name), + getActualTableStructure(context, is_insert_query), + String{}, + source_database_name_or_regexp, + database_is_regexp, + getSourceDatabasesAndTables(context), + context); + + res->startup(); + return res; +} + +TableFunctionMerge::TableSet +TableFunctionMerge::getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context) +{ + OptimizedRegularExpression table_re(table_regexp); + + auto table_name_match = [&](const String & table_name) { return table_re.match(table_name); }; + + auto access = context->getAccess(); + + auto database = DatabaseCatalog::instance().getDatabase(database_name); + + bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, database_name); + bool granted_select_on_all_tables = access->isGranted(AccessType::SELECT, database_name); + + TableSet tables; + + for (auto it = database->getTablesIterator(context, table_name_match); it->isValid(); it->next()) + { + if (!it->table()) + continue; + bool granted_show = granted_show_on_all_tables || access->isGranted(AccessType::SHOW_TABLES, database_name, it->name()); + if (!granted_show) + continue; + if (!granted_select_on_all_tables) + access->checkAccess(AccessType::SELECT, database_name, it->name()); + tables.emplace(it->name()); + } + return tables; +} + +void registerTableFunctionMerge(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionMerge>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionMerge.h b/contrib/clickhouse/src/TableFunctions/TableFunctionMerge.h new file mode 100644 index 0000000000..8cc5119978 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionMerge.h @@ -0,0 +1,38 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> + + +namespace DB +{ + +/* merge (db_name, tables_regexp) - creates a temporary StorageMerge. + * The structure of the table is taken from the first table that came up, suitable for regexp. + * If there is no such table, an exception is thrown. + */ +class TableFunctionMerge : public ITableFunction +{ +public: + static constexpr auto name = "merge"; + std::string getName() const override { return name; } + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "Merge"; } + + using TableSet = std::set<String>; + using DBToTableSetMap = std::map<String, TableSet>; + const DBToTableSetMap & getSourceDatabasesAndTables(ContextPtr context) const; + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + static TableSet getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context); + + String source_database_name_or_regexp; + String source_table_regexp; + bool database_is_regexp = false; + mutable std::optional<DBToTableSetMap> source_databases_and_tables; +}; + + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionMongoDB.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionMongoDB.cpp new file mode 100644 index 0000000000..5400ce8126 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionMongoDB.cpp @@ -0,0 +1,108 @@ +#if 0 +#include <TableFunctions/TableFunctionMongoDB.h> + +#include <Common/Exception.h> + +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/Context.h> + +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/ASTIdentifier.h> + +#include <TableFunctions/TableFunctionFactory.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <TableFunctions/registerTableFunctions.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Storages/ColumnsDescription.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + + +StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/, + ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto columns = getActualTableStructure(context, is_insert_query); + auto storage = std::make_shared<StorageMongoDB>( + StorageID(configuration->database, table_name), + configuration->host, + configuration->port, + configuration->database, + configuration->table, + configuration->username, + configuration->password, + configuration->options, + columns, + ConstraintsDescription(), + String{}); + storage->startup(); + return storage; +} + +ColumnsDescription TableFunctionMongoDB::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + return parseColumnsListFromString(structure, context); +} + +void TableFunctionMongoDB::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto & func_args = ast_function->as<ASTFunction &>(); + if (!func_args.arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'mongodb' must have arguments."); + + ASTs & args = func_args.arguments->children; + + if (args.size() < 6 || args.size() > 7) + { + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function 'mongodb' requires from 6 to 7 parameters: " + "mongodb('host:port', database, collection, 'user', 'password', structure, [, 'options'])"); + } + + ASTs main_arguments(args.begin(), args.begin() + 5); + + for (size_t i = 5; i < args.size(); ++i) + { + if (const auto * ast_func = typeid_cast<const ASTFunction *>(args[i].get())) + { + const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_func->arguments.get()); + auto function_args = args_expr->children; + if (function_args.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument"); + + auto arg_name = function_args[0]->as<ASTIdentifier>()->name(); + + if (arg_name == "structure") + structure = checkAndGetLiteralArgument<String>(function_args[1], "structure"); + else if (arg_name == "options") + main_arguments.push_back(function_args[1]); + } + else if (i == 5) + { + structure = checkAndGetLiteralArgument<String>(args[i], "structure"); + } + else if (i == 6) + { + main_arguments.push_back(args[i]); + } + } + + configuration = StorageMongoDB::getConfiguration(main_arguments, context); +} + + +void registerTableFunctionMongoDB(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionMongoDB>(); +} + +} +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionMongoDB.h b/contrib/clickhouse/src/TableFunctions/TableFunctionMongoDB.h new file mode 100644 index 0000000000..c2c15cabe5 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionMongoDB.h @@ -0,0 +1,31 @@ +#pragma once + +#include <Storages/StorageMongoDB.h> +#include <TableFunctions/ITableFunction.h> +#include <Storages/ExternalDataSourceConfiguration.h> + +namespace DB +{ + +class TableFunctionMongoDB : public ITableFunction +{ +public: + static constexpr auto name = "mongodb"; + + std::string getName() const override { return name; } + +private: + StoragePtr executeImpl( + const ASTPtr & ast_function, ContextPtr context, + const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "MongoDB"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + std::optional<StorageMongoDB::Configuration> configuration; + String structure; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionMySQL.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionMySQL.cpp new file mode 100644 index 0000000000..360c5ae1ff --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionMySQL.cpp @@ -0,0 +1,98 @@ +#include "clickhouse_config.h" + +#if USE_MYSQL +#error #include <Processors/Sources/MySQLSource.h> +#include <Interpreters/Context.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Parsers/ASTFunction.h> +#include <Storages/MySQL/MySQLSettings.h> +#include <Storages/MySQL/MySQLHelpers.h> +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionFactory.h> +#error #include <TableFunctions/TableFunctionMySQL.h> +#include <Common/Exception.h> +#include <Common/parseAddress.h> +#include <Common/quoteString.h> +#include "registerTableFunctions.h" + +#include <Databases/MySQL/DatabaseMySQL.h> +#include <Common/parseRemoteDescription.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto & args_func = ast_function->as<ASTFunction &>(); + + if (!args_func.arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function 'mysql' must have arguments."); + + auto & args = args_func.arguments->children; + + MySQLSettings mysql_settings; + + const auto & settings = context->getSettingsRef(); + mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec; + mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec; + + for (auto * it = args.begin(); it != args.end(); ++it) + { + const ASTSetQuery * settings_ast = (*it)->as<ASTSetQuery>(); + if (settings_ast) + { + mysql_settings.loadFromQuery(*settings_ast); + args.erase(it); + break; + } + } + + configuration = StorageMySQL::getConfiguration(args, context, mysql_settings); + pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings)); +} + +ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + return StorageMySQL::getTableStructureFromData(*pool, configuration->database, configuration->table, context); +} + +StoragePtr TableFunctionMySQL::executeImpl( + const ASTPtr & /*ast_function*/, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool /*is_insert_query*/) const +{ + auto res = std::make_shared<StorageMySQL>( + StorageID(getDatabaseName(), table_name), + std::move(*pool), + configuration->database, + configuration->table, + configuration->replace_query, + configuration->on_duplicate_clause, + cached_columns, + ConstraintsDescription{}, + String{}, + context, + MySQLSettings{}); + + pool.reset(); + + res->startup(); + return res; +} + + +void registerTableFunctionMySQL(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionMySQL>(); +} +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionNull.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionNull.cpp new file mode 100644 index 0000000000..57911e16d4 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionNull.cpp @@ -0,0 +1,61 @@ +#include <Interpreters/Context.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/ASTFunction.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Storages/StorageNull.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionNull.h> +#include <Interpreters/evaluateConstantExpression.h> +#include "registerTableFunctions.h" + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +void TableFunctionNull::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto * function = ast_function->as<ASTFunction>(); + if (!function || !function->arguments) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'structure'", getName()); + + const auto & arguments = function->arguments->children; + if (!arguments.empty() && arguments.size() != 1) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function '{}' requires 'structure' argument or empty argument", getName()); + + if (!arguments.empty()) + structure = checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(arguments[0], context), "structure"); +} + +ColumnsDescription TableFunctionNull::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + if (structure != "auto") + return parseColumnsListFromString(structure, context); + return default_structure; +} + +StoragePtr TableFunctionNull::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const +{ + ColumnsDescription columns; + if (structure != "auto") + columns = parseColumnsListFromString(structure, context); + else if (!structure_hint.empty()) + columns = structure_hint; + else + columns = default_structure; + + auto res = std::make_shared<StorageNull>(StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription(), String{}); + res->startup(); + return res; +} + +void registerTableFunctionNull(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionNull>({.documentation = {}, .allow_readonly = true}); +} +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionNull.h b/contrib/clickhouse/src/TableFunctions/TableFunctionNull.h new file mode 100644 index 0000000000..e80552d4cf --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionNull.h @@ -0,0 +1,37 @@ +#pragma once + +#include <Core/Types.h> +#include <DataTypes/DataTypesNumber.h> +#include <TableFunctions/ITableFunction.h> + + +namespace DB +{ + +/* null(structure) - creates a temporary null storage + * + * Used for testing purposes, for convenience writing tests and demos. + */ +class TableFunctionNull : public ITableFunction +{ +public: + static constexpr auto name = "null"; + std::string getName() const override { return name; } + + bool needStructureHint() const override { return structure == "auto"; } + + void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "Null"; } + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + String structure = "auto"; + ColumnsDescription structure_hint; + + const ColumnsDescription default_structure{NamesAndTypesList{{"dummy", std::make_shared<DataTypeUInt8>()}}}; +}; +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionNumbers.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionNumbers.cpp new file mode 100644 index 0000000000..d6cf50bc7d --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionNumbers.cpp @@ -0,0 +1,74 @@ +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionNumbers.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Parsers/ASTFunction.h> +#include <Common/typeid_cast.h> +#include <Common/FieldVisitorToString.h> +#include <Storages/System/StorageSystemNumbers.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/convertFieldToType.h> +#include <Interpreters/Context.h> +#include <DataTypes/DataTypesNumber.h> +#include "registerTableFunctions.h" + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + + +template <bool multithreaded> +ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const +{ + /// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418 + return ColumnsDescription{{{"number", std::make_shared<DataTypeUInt64>()}}}; +} + +template <bool multithreaded> +StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const +{ + if (const auto * function = ast_function->as<ASTFunction>()) + { + auto arguments = function->arguments->children; + + if (arguments.size() != 1 && arguments.size() != 2) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length' or 'offset, length'.", getName()); + + UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0; + UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]); + + auto res = std::make_shared<StorageSystemNumbers>(StorageID(getDatabaseName(), table_name), multithreaded, length, offset, false); + res->startup(); + return res; + } + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'limit' or 'offset, limit'.", getName()); +} + +void registerTableFunctionNumbers(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionNumbers<true>>({.documentation = {}, .allow_readonly = true}); + factory.registerFunction<TableFunctionNumbers<false>>({.documentation = {}, .allow_readonly = true}); +} + +template <bool multithreaded> +UInt64 TableFunctionNumbers<multithreaded>::evaluateArgument(ContextPtr context, ASTPtr & argument) const +{ + const auto & [field, type] = evaluateConstantExpression(argument, context); + + if (!isNativeNumber(type)) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} expression, must be numeric type", type->getName()); + + Field converted = convertFieldToType(field, DataTypeUInt64()); + if (converted.isNull()) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The value {} is not representable as UInt64", + applyVisitor(FieldVisitorToString(), field)); + + return converted.safeGet<UInt64>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionNumbers.h b/contrib/clickhouse/src/TableFunctions/TableFunctionNumbers.h new file mode 100644 index 0000000000..e380f40f7b --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionNumbers.h @@ -0,0 +1,31 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> +#include <base/types.h> + + +namespace DB +{ + +/* numbers(limit), numbers_mt(limit) + * - the same as SELECT number FROM system.numbers LIMIT limit. + * Used for testing purposes, as a simple example of table function. + */ +template <bool multithreaded> +class TableFunctionNumbers : public ITableFunction +{ +public: + static constexpr auto name = multithreaded ? "numbers_mt" : "numbers"; + std::string getName() const override { return name; } + bool hasStaticStructure() const override { return true; } +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "SystemNumbers"; } + + UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const; + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; +}; + + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionPostgreSQL.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionPostgreSQL.cpp new file mode 100644 index 0000000000..6ed5883c3c --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionPostgreSQL.cpp @@ -0,0 +1,71 @@ +#include <TableFunctions/TableFunctionPostgreSQL.h> + +#if USE_LIBPQXX +#include <Interpreters/evaluateConstantExpression.h> +#include <Parsers/ASTFunction.h> +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Common/Exception.h> +#include "registerTableFunctions.h" +#include <Common/parseRemoteDescription.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + + +StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/, + ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool /*is_insert_query*/) const +{ + auto result = std::make_shared<StoragePostgreSQL>( + StorageID(getDatabaseName(), table_name), + connection_pool, + configuration->table, + cached_columns, + ConstraintsDescription{}, + String{}, + context, + configuration->schema, + configuration->on_conflict); + + result->startup(); + return result; +} + + +ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + return StoragePostgreSQL::getTableStructureFromData(connection_pool, configuration->table, configuration->schema, context); +} + + +void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto & func_args = ast_function->as<ASTFunction &>(); + if (!func_args.arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'PostgreSQL' must have arguments."); + + configuration.emplace(StoragePostgreSQL::getConfiguration(func_args.arguments->children, context)); + const auto & settings = context->getSettingsRef(); + connection_pool = std::make_shared<postgres::PoolWithFailover>( + *configuration, + settings.postgresql_connection_pool_size, + settings.postgresql_connection_pool_wait_timeout, + POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, + settings.postgresql_connection_pool_auto_close_connection); +} + + +void registerTableFunctionPostgreSQL(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionPostgreSQL>(); +} + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionPostgreSQL.h b/contrib/clickhouse/src/TableFunctions/TableFunctionPostgreSQL.h new file mode 100644 index 0000000000..8559409c60 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionPostgreSQL.h @@ -0,0 +1,36 @@ +#pragma once +#include "clickhouse_config.h" + +#if USE_LIBPQXX +#include <TableFunctions/ITableFunction.h> +#include <Core/PostgreSQL/PoolWithFailover.h> +#include <Storages/ExternalDataSourceConfiguration.h> +#include <Storages/StoragePostgreSQL.h> + + +namespace DB +{ + +class TableFunctionPostgreSQL : public ITableFunction +{ +public: + static constexpr auto name = "postgresql"; + std::string getName() const override { return name; } + +private: + StoragePtr executeImpl( + const ASTPtr & ast_function, ContextPtr context, + const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "PostgreSQL"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + postgres::PoolWithFailoverPtr connection_pool; + std::optional<StoragePostgreSQL::Configuration> configuration; +}; + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionRedis.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionRedis.cpp new file mode 100644 index 0000000000..0b7433845b --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionRedis.cpp @@ -0,0 +1,94 @@ +#include <TableFunctions/TableFunctionRedis.h> + +#include <Common/Exception.h> +#include <Common/parseAddress.h> + +#include <Interpreters/Context.h> + +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTIdentifier.h> + +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Storages/ColumnsDescription.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/registerTableFunctions.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Interpreters/evaluateConstantExpression.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +StoragePtr TableFunctionRedis::executeImpl( + const ASTPtr & /*ast_function*/, ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto columns = getActualTableStructure(context, is_insert_query); + + StorageInMemoryMetadata metadata; + metadata.setColumns(columns); + + String db_name = "redis" + getDatabaseName() + "_db_" + toString(configuration.db_index); + auto storage = std::make_shared<StorageRedis>( + StorageID(db_name, table_name), configuration, context, metadata, primary_key); + storage->startup(); + return storage; +} + +ColumnsDescription TableFunctionRedis::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + return parseColumnsListFromString(structure, context); +} + +void TableFunctionRedis::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto & func_args = ast_function->as<ASTFunction &>(); + if (!func_args.arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'redis' must have arguments."); + + ASTs & args = func_args.arguments->children; + + if (args.size() < 3) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad arguments count when creating Redis table function"); + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + auto parsed_host_port = parseAddress(checkAndGetLiteralArgument<String>(args[0], "host:port"), 6379); + configuration.host = parsed_host_port.first; + configuration.port = parsed_host_port.second; + + primary_key = checkAndGetLiteralArgument<String>(args[1], "key"); + structure = checkAndGetLiteralArgument<String>(args[2], "structure"); + + if (args.size() > 3) + configuration.db_index = static_cast<uint32_t>(checkAndGetLiteralArgument<UInt64>(args[3], "db_index")); + else + configuration.db_index = DEFAULT_REDIS_DB_INDEX; + if (args.size() > 4) + configuration.password = checkAndGetLiteralArgument<String>(args[4], "password"); + else + configuration.password = DEFAULT_REDIS_PASSWORD; + if (args.size() > 5) + configuration.pool_size = static_cast<uint32_t>(checkAndGetLiteralArgument<UInt64>(args[5], "pool_size")); + else + configuration.pool_size = DEFAULT_REDIS_POOL_SIZE; + + context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port)); + + auto columns = parseColumnsListFromString(structure, context); + if (!columns.has(primary_key)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad arguments redis table function structure should contains key."); +} + + +void registerTableFunctionRedis(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionRedis>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionRedis.h b/contrib/clickhouse/src/TableFunctions/TableFunctionRedis.h new file mode 100644 index 0000000000..a7fc0df0a1 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionRedis.h @@ -0,0 +1,34 @@ +#pragma once + +#include <Storages/StorageRedis.h> +#include <TableFunctions/ITableFunction.h> +#include <Storages/ExternalDataSourceConfiguration.h> + +namespace DB +{ + +/* Implements Redis table function. + * Use redis(host:port, key, structure[, db_index[, password[, pool_size]]]); + */ +class TableFunctionRedis : public ITableFunction +{ +public: + static constexpr auto name = "redis"; + String getName() const override { return name; } + +private: + StoragePtr executeImpl( + const ASTPtr & ast_function, ContextPtr context, + const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "Redis"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + RedisConfiguration configuration; + String structure; + String primary_key; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionRemote.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionRemote.cpp new file mode 100644 index 0000000000..1f42ce4ba3 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionRemote.cpp @@ -0,0 +1,370 @@ +#include "TableFunctionRemote.h" + +#include <Storages/getStructureOfRemoteTable.h> +#include <Storages/StorageDistributed.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Storages/NamedCollectionsHelpers.h> +#include <Parsers/ASTIdentifier_fwd.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTExpressionList.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/Cluster.h> +#include <Interpreters/Context.h> +#include <Interpreters/IdentifierSemantic.h> +#include <Common/typeid_cast.h> +#include <Common/parseRemoteDescription.h> +#include <Common/Macros.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Core/Defines.h> +#include <base/range.h> +#include "registerTableFunctions.h" + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + + +void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + ASTs & args_func = ast_function->children; + + String cluster_name; + String cluster_description; + String database = "system"; + String table = "one"; /// The table containing one row is used by default for queries without explicit table specification. + String username = "default"; + String password; + + if (args_func.size() != 1) + throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + ASTs & args = args_func.at(0)->children; + + /** + * Number of arguments for remote function is 4. + * Number of arguments for cluster function is 6. + * For now named collection can be used only for remote as cluster does not require credentials. + */ + size_t max_args = is_cluster_function ? 4 : 6; + NamedCollectionPtr named_collection; + std::vector<std::pair<std::string, ASTPtr>> complex_args; + if (!is_cluster_function && (named_collection = tryGetNamedCollectionWithOverrides(args, context, false, &complex_args))) + { + validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>( + *named_collection, + {"addresses_expr", "host", "hostname", "table"}, + {"username", "user", "password", "sharding_key", "port", "database", "db"}); + + if (!complex_args.empty()) + { + for (const auto & [arg_name, arg_ast] : complex_args) + { + if (arg_name == "database" || arg_name == "db") + remote_table_function_ptr = arg_ast; + else if (arg_name == "sharding_key") + sharding_key = arg_ast; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected argument representation for {}", arg_name); + } + } + else + database = named_collection->getAnyOrDefault<String>({"db", "database"}, "default"); + + cluster_description = named_collection->getOrDefault<String>("addresses_expr", ""); + if (cluster_description.empty() && named_collection->hasAny({"host", "hostname"})) + cluster_description = named_collection->has("port") + ? named_collection->getAny<String>({"host", "hostname"}) + ':' + toString(named_collection->get<UInt64>("port")) + : named_collection->getAny<String>({"host", "hostname"}); + table = named_collection->get<String>("table"); + username = named_collection->getAnyOrDefault<String>({"username", "user"}, "default"); + password = named_collection->getOrDefault<String>("password", ""); + } + else + { + /// Supported signatures: + /// remote('addresses_expr') + /// remote('addresses_expr', db.table) + /// remote('addresses_expr', 'db', 'table') + /// remote('addresses_expr', db.table, 'user') + /// remote('addresses_expr', 'db', 'table', 'user') + /// remote('addresses_expr', db.table, 'user', 'password') + /// remote('addresses_expr', 'db', 'table', 'user', 'password') + /// remote('addresses_expr', db.table, sharding_key) + /// remote('addresses_expr', 'db', 'table', sharding_key) + /// remote('addresses_expr', db.table, 'user', sharding_key) + /// remote('addresses_expr', 'db', 'table', 'user', sharding_key) + /// remote('addresses_expr', db.table, 'user', 'password', sharding_key) + /// remote('addresses_expr', 'db', 'table', 'user', 'password', sharding_key) + /// + /// remoteSecure() - same as remote() + /// + /// cluster() + /// cluster('cluster_name') + /// cluster('cluster_name', db.table) + /// cluster('cluster_name', 'db', 'table') + /// cluster('cluster_name', db.table, sharding_key) + /// cluster('cluster_name', 'db', 'table', sharding_key) + /// + /// clusterAllReplicas() - same as cluster() + + if ((!is_cluster_function && args.empty()) || args.size() > max_args) + throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + size_t arg_num = 0; + auto get_string_literal = [](const IAST & node, String & res) + { + const auto * lit = node.as<ASTLiteral>(); + if (!lit) + return false; + + if (lit->value.getType() != Field::Types::String) + return false; + + res = lit->value.safeGet<const String &>(); + return true; + }; + + if (is_cluster_function) + { + if (!args.empty()) + { + args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context); + cluster_name = checkAndGetLiteralArgument<String>(args[arg_num], "cluster_name"); + } + else + { + cluster_name = "default"; + } + } + else + { + if (!tryGetIdentifierNameInto(args[arg_num], cluster_name)) + { + if (!get_string_literal(*args[arg_num], cluster_description)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Hosts pattern must be string literal (in single quotes)."); + } + } + + ++arg_num; + + /// Names of database and table is not necessary. + if (arg_num < args.size()) + { + const auto * function = args[arg_num]->as<ASTFunction>(); + if (function && TableFunctionFactory::instance().isTableFunctionName(function->name)) + { + remote_table_function_ptr = args[arg_num]; + ++arg_num; + } + else + { + args[arg_num] = evaluateConstantExpressionForDatabaseName(args[arg_num], context); + database = checkAndGetLiteralArgument<String>(args[arg_num], "database"); + + ++arg_num; + + auto qualified_name = QualifiedTableName::parseFromString(database); + if (qualified_name.database.empty()) + { + if (arg_num >= args.size()) + { + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table name was not found in function arguments. {}", static_cast<const std::string>(help_message)); + } + else + { + std::swap(qualified_name.database, qualified_name.table); + args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context); + qualified_name.table = checkAndGetLiteralArgument<String>(args[arg_num], "table"); + ++arg_num; + } + } + + database = std::move(qualified_name.database); + table = std::move(qualified_name.table); + + /// Cluster function may have sharding key for insert + if (is_cluster_function && arg_num < args.size()) + { + sharding_key = args[arg_num]; + ++arg_num; + } + } + } + + /// Username and password parameters are prohibited in cluster version of the function + if (!is_cluster_function) + { + if (arg_num < args.size()) + { + if (!get_string_literal(*args[arg_num], username)) + { + username = "default"; + sharding_key = args[arg_num]; + } + ++arg_num; + } + + if (arg_num < args.size() && !sharding_key) + { + if (!get_string_literal(*args[arg_num], password)) + { + sharding_key = args[arg_num]; + } + ++arg_num; + } + + if (arg_num < args.size()) + { + if (sharding_key) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Arguments `user` and `password` should be string literals (in single quotes)"); + sharding_key = args[arg_num]; + ++arg_num; + } + } + + if (arg_num < args.size()) + { + throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + } + } + + if (!cluster_name.empty()) + { + /// Use an existing cluster from the main config + if (name != "clusterAllReplicas") + cluster = context->getCluster(cluster_name); + else + cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); + } + else + { + /// Create new cluster from the scratch + size_t max_addresses = context->getSettingsRef().table_function_remote_max_addresses; + std::vector<String> shards = parseRemoteDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses); + + std::vector<std::vector<String>> names; + names.reserve(shards.size()); + for (const auto & shard : shards) + names.push_back(parseRemoteDescription(shard, 0, shard.size(), '|', max_addresses)); + + if (names.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Shard list is empty after parsing first argument"); + + auto maybe_secure_port = context->getTCPPortSecure(); + + /// Check host and port on affiliation allowed hosts. + for (const auto & hosts : names) + { + for (const auto & host : hosts) + { + size_t colon = host.find(':'); + if (colon == String::npos) + context->getRemoteHostFilter().checkHostAndPort( + host, + toString((secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context->getTCPPort()))); + else + context->getRemoteHostFilter().checkHostAndPort(host.substr(0, colon), host.substr(colon + 1)); + } + } + + bool treat_local_as_remote = false; + bool treat_local_port_as_remote = context->getApplicationType() == Context::ApplicationType::LOCAL; + ClusterConnectionParameters params{ + username, + password, + static_cast<UInt16>(secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context->getTCPPort()), + treat_local_as_remote, + treat_local_port_as_remote, + secure, + /* priority= */ Priority{1}, + /* cluster_name= */ "", + /* cluster_secret= */ "" + }; + cluster = std::make_shared<Cluster>(context->getSettingsRef(), names, params); + } + + if (!remote_table_function_ptr && table.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The name of remote table cannot be empty"); + + remote_table_id.database_name = database; + remote_table_id.table_name = table; +} + +StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const +{ + /// StorageDistributed supports mismatching structure of remote table, so we can use outdated structure for CREATE ... AS remote(...) + /// without additional conversion in StorageTableFunctionProxy + if (cached_columns.empty()) + cached_columns = getActualTableStructure(context, is_insert_query); + + assert(cluster); + StoragePtr res = remote_table_function_ptr + ? std::make_shared<StorageDistributed>( + StorageID(getDatabaseName(), table_name), + cached_columns, + ConstraintsDescription{}, + remote_table_function_ptr, + String{}, + context, + sharding_key, + String{}, + String{}, + DistributedSettings{}, + false, + cluster) + : std::make_shared<StorageDistributed>( + StorageID(getDatabaseName(), table_name), + cached_columns, + ConstraintsDescription{}, + String{}, + remote_table_id.database_name, + remote_table_id.table_name, + String{}, + context, + sharding_key, + String{}, + String{}, + DistributedSettings{}, + false, + cluster); + + res->startup(); + return res; +} + +ColumnsDescription TableFunctionRemote::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + assert(cluster); + return getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr); +} + +TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_) + : name{name_}, secure{secure_} +{ + is_cluster_function = (name == "cluster" || name == "clusterAllReplicas"); + help_message = PreformattedMessage::create( + "Table function '{}' requires from {} to {} parameters: " + "{}", + name, + is_cluster_function ? 0 : 1, + is_cluster_function ? 4 : 6, + is_cluster_function ? "[<cluster name or default if not specify>, <name of remote database>, <name of remote table>] [, sharding_key]" + : "<addresses pattern> [, <name of remote database>, <name of remote table>] [, username[, password], sharding_key]"); +} + +void registerTableFunctionRemote(TableFunctionFactory & factory) +{ + factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote"); }); + factory.registerFunction("remoteSecure", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote", /* secure = */ true); }); + factory.registerFunction("cluster", {[] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster"); }, {.documentation = {}, .allow_readonly = true}}); + factory.registerFunction("clusterAllReplicas", {[] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("clusterAllReplicas"); }, {.documentation = {}, .allow_readonly = true}}); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionRemote.h b/contrib/clickhouse/src/TableFunctions/TableFunctionRemote.h new file mode 100644 index 0000000000..0f75bf2b85 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionRemote.h @@ -0,0 +1,47 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> +#include <Interpreters/Cluster.h> +#include <Interpreters/StorageID.h> + + +namespace DB +{ + +/* remote ('address', db, table) - creates a temporary StorageDistributed. + * To get the table structure, a DESC TABLE request is made to the remote server. + * For example: + * SELECT count() FROM remote('example01-01-1', merge, hits) - go to `example01-01-1`, in the merge database, the hits table. + * An expression that generates a set of shards and replicas can also be specified as the host name - see below. + * Also, there is a cluster version of the function: cluster('existing_cluster_name', 'db', 'table'). + */ +class TableFunctionRemote : public ITableFunction +{ +public: + explicit TableFunctionRemote(const std::string & name_, bool secure_ = false); + + std::string getName() const override { return name; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + bool needStructureConversion() const override { return false; } + +private: + + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "Distributed"; } + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + std::string name; + bool is_cluster_function; + PreformattedMessage help_message; + bool secure; + + ClusterPtr cluster; + StorageID remote_table_id = StorageID::createEmpty(); + ASTPtr remote_table_function_ptr; + ASTPtr sharding_key = nullptr; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionS3.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionS3.cpp new file mode 100644 index 0000000000..7f6867cc69 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionS3.cpp @@ -0,0 +1,437 @@ +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include <IO/S3Common.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/Context.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionS3.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Access/Common/AccessFlags.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/ASTIdentifier.h> +#include <Parsers/ASTFunction.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Storages/StorageS3.h> +#include <Storages/StorageURL.h> +#include <Storages/NamedCollectionsHelpers.h> +#include <Formats/FormatFactory.h> +#include "registerTableFunctions.h" +#include <Analyzer/FunctionNode.h> +#include <Analyzer/TableFunctionNode.h> + +#include <boost/algorithm/string.hpp> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int LOGICAL_ERROR; +} + + +std::vector<size_t> TableFunctionS3::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const +{ + auto & table_function_node = query_node_table_function->as<TableFunctionNode &>(); + auto & table_function_arguments_nodes = table_function_node.getArguments().getNodes(); + size_t table_function_arguments_size = table_function_arguments_nodes.size(); + + std::vector<size_t> result; + + for (size_t i = 0; i < table_function_arguments_size; ++i) + { + auto * function_node = table_function_arguments_nodes[i]->as<FunctionNode>(); + if (function_node && function_node->getFunctionName() == "headers") + result.push_back(i); + } + + return result; +} + +/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name +void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context) +{ + if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context)) + { + StorageS3::processNamedCollectionResult(configuration, *named_collection); + if (configuration.format == "auto") + { + String file_path = named_collection->getOrDefault<String>("filename", Poco::URI(named_collection->get<String>("url")).getPath()); + configuration.format = FormatFactory::instance().getFormatFromFileName(file_path, true); + } + } + else + { + + auto * header_it = StorageURL::collectHeaders(args, configuration.headers_from_ast, context); + if (header_it != args.end()) + args.erase(header_it); + + if (args.empty() || args.size() > 6) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature()); + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + /// Size -> argument indexes + static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_args + { + {1, {{}}}, + {6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}} + }; + + std::unordered_map<std::string_view, size_t> args_to_idx; + + bool no_sign_request = false; + + /// For 2 arguments we support 2 possible variants: + /// - s3(source, format) + /// - s3(source, NOSIGN) + /// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not. + if (args.size() == 2) + { + auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN"); + if (boost::iequals(second_arg, "NOSIGN")) + no_sign_request = true; + else + args_to_idx = {{"format", 1}}; + } + /// For 3 arguments we support 3 possible variants: + /// - s3(source, format, structure) + /// - s3(source, access_key_id, access_key_id) + /// - s3(source, NOSIGN, format) + /// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not. + else if (args.size() == 3) + { + auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN"); + if (boost::iequals(second_arg, "NOSIGN")) + { + no_sign_request = true; + args_to_idx = {{"format", 2}}; + } + else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg)) + args_to_idx = {{"format", 1}, {"structure", 2}}; + else + args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}}; + } + /// For 4 arguments we support 3 possible variants: + /// - s3(source, format, structure, compression_method), + /// - s3(source, access_key_id, access_key_id, format) + /// - s3(source, NOSIGN, format, structure) + /// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not. + else if (args.size() == 4) + { + auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN"); + if (boost::iequals(second_arg, "NOSIGN")) + { + no_sign_request = true; + args_to_idx = {{"format", 2}, {"structure", 3}}; + } + else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg)) + args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}}; + else + args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}}; + } + /// For 5 arguments we support 2 possible variants: + /// - s3(source, access_key_id, access_key_id, format, structure) + /// - s3(source, NOSIGN, format, structure, compression_method) + /// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not. + else if (args.size() == 5) + { + auto second_arg = checkAndGetLiteralArgument<String>(args[1], "NOSIGN/access_key_id"); + if (boost::iequals(second_arg, "NOSIGN")) + { + no_sign_request = true; + args_to_idx = {{"format", 2}, {"structure", 3}, {"compression_method", 4}}; + } + else + args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}}; + } + else + { + args_to_idx = size_to_args[args.size()]; + } + + /// This argument is always the first + String url = checkAndGetLiteralArgument<String>(args[0], "url"); + configuration.url = S3::URI(url); + + if (args_to_idx.contains("format")) + { + auto format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format"); + /// Set format to configuration only of it's not 'auto', + /// because we can have default format set in configuration. + if (format != "auto") + configuration.format = format; + } + + if (args_to_idx.contains("structure")) + configuration.structure = checkAndGetLiteralArgument<String>(args[args_to_idx["structure"]], "structure"); + + if (args_to_idx.contains("compression_method")) + configuration.compression_method = checkAndGetLiteralArgument<String>(args[args_to_idx["compression_method"]], "compression_method"); + + if (args_to_idx.contains("access_key_id")) + configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(args[args_to_idx["access_key_id"]], "access_key_id"); + + if (args_to_idx.contains("secret_access_key")) + configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key"); + + configuration.auth_settings.no_sign_request = no_sign_request; + + if (configuration.format == "auto") + configuration.format = FormatFactory::instance().getFormatFromFileName(Poco::URI(url).getPath(), true); + } + + configuration.keys = {configuration.url.key}; +} + +void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + /// Clone ast function, because we can modify its arguments like removing headers. + auto ast_copy = ast_function->clone(); + + /// Parse args + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName()); + + auto & args = args_func.at(0)->children; + + parseArgumentsImpl(args, context); +} + +void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context) +{ + if (tryGetNamedCollectionWithOverrides(args, context)) + { + /// In case of named collection, just add key-value pair "structure='...'" + /// at the end of arguments to override existed structure. + ASTs equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure)}; + auto equal_func = makeASTFunction("equals", std::move(equal_func_args)); + args.push_back(equal_func); + } + else + { + /// If arguments contain headers, just remove it and add to the end of arguments later + /// (header argument can be at any position). + HTTPHeaderEntries tmp_headers; + auto * headers_it = StorageURL::collectHeaders(args, tmp_headers, context); + ASTPtr headers_ast; + if (headers_it != args.end()) + { + headers_ast = *headers_it; + args.erase(headers_it); + } + + if (args.empty() || args.size() > getMaxNumberOfArguments()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), args.size()); + + auto structure_literal = std::make_shared<ASTLiteral>(structure); + + /// s3(s3_url) + if (args.size() == 1) + { + /// Add format=auto before structure argument. + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(structure_literal); + } + /// s3(s3_url, format) or s3(s3_url, NOSIGN) + /// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not. + else if (args.size() == 2) + { + auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN"); + /// If there is NOSIGN, add format=auto before structure. + if (boost::iequals(second_arg, "NOSIGN")) + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(structure_literal); + } + /// s3(source, format, structure) or + /// s3(source, access_key_id, access_key_id) or + /// s3(source, NOSIGN, format) + /// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither. + else if (args.size() == 3) + { + auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN"); + if (boost::iequals(second_arg, "NOSIGN")) + { + args.push_back(structure_literal); + } + else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg)) + { + args.back() = structure_literal; + } + else + { + /// Add format=auto before structure argument. + args.push_back(std::make_shared<ASTLiteral>("auto")); + args.push_back(structure_literal); + } + } + /// s3(source, format, structure, compression_method) or + /// s3(source, access_key_id, access_key_id, format) or + /// s3(source, NOSIGN, format, structure) + /// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither. + else if (args.size() == 4) + { + auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN"); + if (boost::iequals(second_arg, "NOSIGN")) + { + args.back() = structure_literal; + } + else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg)) + { + args[args.size() - 2] = structure_literal; + } + else + { + args.push_back(structure_literal); + } + } + /// s3(source, access_key_id, access_key_id, format, structure) or + /// s3(source, NOSIGN, format, structure, compression_method) + /// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not. + else if (args.size() == 5) + { + auto sedond_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN"); + if (boost::iequals(sedond_arg, "NOSIGN")) + { + args[args.size() - 2] = structure_literal; + } + else + { + args.back() = structure_literal; + } + } + /// s3(source, access_key_id, access_key_id, format, structure, compression) + else if (args.size() == 6) + { + args[args.size() - 2] = structure_literal; + } + + if (headers_ast) + args.push_back(headers_ast); + } +} + +ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + if (configuration.structure == "auto") + { + context->checkAccess(getSourceAccessType()); + configuration.update(context); + return StorageS3::getTableStructureFromData(configuration, std::nullopt, context); + } + + return parseColumnsListFromString(configuration.structure, context); +} + +bool TableFunctionS3::supportsReadingSubsetOfColumns(const ContextPtr & context) +{ + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context); +} + +StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool /*is_insert_query*/) const +{ + S3::URI s3_uri (configuration.url); + + ColumnsDescription columns; + if (configuration.structure != "auto") + columns = parseColumnsListFromString(configuration.structure, context); + else if (!structure_hint.empty()) + columns = structure_hint; + else if (!cached_columns.empty()) + columns = cached_columns; + + StoragePtr storage = std::make_shared<StorageS3>( + configuration, + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + String{}, + /// No format_settings for table function S3 + std::nullopt); + + storage->startup(); + + return storage; +} + + +class TableFunctionGCS : public TableFunctionS3 +{ +public: + static constexpr auto name = "gcs"; + std::string getName() const override + { + return name; + } +private: + const char * getStorageTypeName() const override { return "GCS"; } +}; + +class TableFunctionCOS : public TableFunctionS3 +{ +public: + static constexpr auto name = "cosn"; + std::string getName() const override + { + return name; + } +private: + const char * getStorageTypeName() const override { return "COSN"; } +}; + +class TableFunctionOSS : public TableFunctionS3 +{ +public: + static constexpr auto name = "oss"; + std::string getName() const override + { + return name; + } +private: + const char * getStorageTypeName() const override { return "OSS"; } +}; + + +void registerTableFunctionGCS(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionGCS>( + {.documentation + = {.description=R"(The table function can be used to read the data stored on Google Cloud Storage.)", + .examples{{"gcs", "SELECT * FROM gcs(url, hmac_key, hmac_secret)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +} + +void registerTableFunctionS3(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionS3>( + {.documentation + = {.description=R"(The table function can be used to read the data stored on AWS S3.)", + .examples{{"s3", "SELECT * FROM s3(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +} + + +void registerTableFunctionCOS(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionCOS>(); +} + +void registerTableFunctionOSS(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionOSS>(); +} + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionS3.h b/contrib/clickhouse/src/TableFunctions/TableFunctionS3.h new file mode 100644 index 0000000000..cb9fad6e6c --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionS3.h @@ -0,0 +1,85 @@ +#pragma once + +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include <TableFunctions/ITableFunction.h> +#include <Storages/StorageS3.h> + + +namespace DB +{ + +class Context; + +/* s3(source, [access_key_id, secret_access_key,] [format, structure, compression]) - creates a temporary storage for a file in S3. + */ +class TableFunctionS3 : public ITableFunction +{ +public: + static constexpr auto name = "s3"; + static constexpr auto signature = " - url\n" + " - url, format\n" + " - url, format, structure\n" + " - url, access_key_id, secret_access_key\n" + " - url, format, structure, compression_method\n" + " - url, access_key_id, secret_access_key, format\n" + " - url, access_key_id, secret_access_key, format, structure\n" + " - url, access_key_id, secret_access_key, format, structure, compression_method\n" + "All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)"; + + static size_t getMaxNumberOfArguments() { return 6; } + + String getName() const override + { + return name; + } + + virtual String getSignature() const + { + return signature; + } + + bool hasStaticStructure() const override { return configuration.structure != "auto"; } + + bool needStructureHint() const override { return configuration.structure == "auto"; } + + void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } + + bool supportsReadingSubsetOfColumns(const ContextPtr & context) override; + + std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override + { + return {"_path", "_file"}; + } + + virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context); + + static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context); + +protected: + + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "S3"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + mutable StorageS3::Configuration configuration; + ColumnsDescription structure_hint; + +private: + + std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; +}; + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionS3Cluster.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionS3Cluster.cpp new file mode 100644 index 0000000000..4eb18851bb --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionS3Cluster.cpp @@ -0,0 +1,74 @@ +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include <TableFunctions/TableFunctionS3Cluster.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Storages/StorageS3.h> + +#include "registerTableFunctions.h" + +#include <memory> + + +namespace DB +{ + +StoragePtr TableFunctionS3Cluster::executeImpl( + const ASTPtr & /*function*/, ContextPtr context, + const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const +{ + StoragePtr storage; + ColumnsDescription columns; + bool structure_argument_was_provided = configuration.structure != "auto"; + + if (structure_argument_was_provided) + { + columns = parseColumnsListFromString(configuration.structure, context); + } + else if (!structure_hint.empty()) + { + columns = structure_hint; + } + + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + /// On worker node this filename won't contains globs + storage = std::make_shared<StorageS3>( + configuration, + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + /* comment */String{}, + /* format_settings */std::nullopt, /// No format_settings for S3Cluster + /*distributed_processing=*/true); + } + else + { + storage = std::make_shared<StorageS3Cluster>( + cluster_name, + configuration, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + context, + structure_argument_was_provided); + } + + storage->startup(); + + return storage; +} + + +void registerTableFunctionS3Cluster(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionS3Cluster>(); +} + + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionS3Cluster.h b/contrib/clickhouse/src/TableFunctions/TableFunctionS3Cluster.h new file mode 100644 index 0000000000..4b87f743cc --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionS3Cluster.h @@ -0,0 +1,63 @@ +#pragma once + +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionS3.h> +#include <TableFunctions/ITableFunctionCluster.h> +#include <Storages/StorageS3Cluster.h> + + +namespace DB +{ + +class Context; + +/** + * s3cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure, compression_method) + * A table function, which allows to process many files from S3 on a specific cluster + * On initiator it creates a connection to _all_ nodes in cluster, discloses asterisks + * in S3 file path and dispatch each file dynamically. + * On worker node it asks initiator about next task to process, processes it. + * This is repeated until the tasks are finished. + */ +class TableFunctionS3Cluster : public ITableFunctionCluster<TableFunctionS3> +{ +public: + static constexpr auto name = "s3Cluster"; + static constexpr auto signature = " - cluster, url\n" + " - cluster, url, format\n" + " - cluster, url, format, structure\n" + " - cluster, url, access_key_id, secret_access_key\n" + " - cluster, url, format, structure, compression_method\n" + " - cluster, url, access_key_id, secret_access_key, format\n" + " - cluster, url, access_key_id, secret_access_key, format, structure\n" + " - cluster, url, access_key_id, secret_access_key, format, structure, compression_method\n" + "All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)"; + + String getName() const override + { + return name; + } + + String getSignature() const override + { + return signature; + } + +protected: + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "S3Cluster"; } +}; + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionSQLite.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionSQLite.cpp new file mode 100644 index 0000000000..36525ef922 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionSQLite.cpp @@ -0,0 +1,80 @@ +#include <TableFunctions/TableFunctionSQLite.h> + +#if USE_SQLITE + +#include <Common/Exception.h> +#include <Common/quoteString.h> + +#error #include <Databases/SQLite/SQLiteUtils.h> +#include "registerTableFunctions.h" + +#include <Interpreters/evaluateConstantExpression.h> + +#include <Parsers/ASTFunction.h> + +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionFactory.h> + +#include <Storages/checkAndGetLiteralArgument.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + + +StoragePtr TableFunctionSQLite::executeImpl(const ASTPtr & /*ast_function*/, + ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool /*is_insert_query*/) const +{ + auto storage = std::make_shared<StorageSQLite>(StorageID(getDatabaseName(), table_name), + sqlite_db, + database_path, + remote_table_name, + cached_columns, ConstraintsDescription{}, context); + + storage->startup(); + return storage; +} + + +ColumnsDescription TableFunctionSQLite::getActualTableStructure(ContextPtr /* context */, bool /*is_insert_query*/) const +{ + return StorageSQLite::getTableStructureFromData(sqlite_db, remote_table_name); +} + + +void TableFunctionSQLite::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto & func_args = ast_function->as<ASTFunction &>(); + + if (!func_args.arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'sqlite' must have arguments."); + + ASTs & args = func_args.arguments->children; + + if (args.size() != 2) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "SQLite database requires 2 arguments: database path, table name"); + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + database_path = checkAndGetLiteralArgument<String>(args[0], "database_path"); + remote_table_name = checkAndGetLiteralArgument<String>(args[1], "table_name"); + + sqlite_db = openSQLiteDB(database_path, context); +} + + +void registerTableFunctionSQLite(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionSQLite>(); +} + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionSQLite.h b/contrib/clickhouse/src/TableFunctions/TableFunctionSQLite.h new file mode 100644 index 0000000000..f612bd1762 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionSQLite.h @@ -0,0 +1,34 @@ +#pragma once +#include "clickhouse_config.h" + +#if USE_SQLITE +#include <TableFunctions/ITableFunction.h> +#include <Storages/StorageSQLite.h> + + +namespace DB +{ + +class TableFunctionSQLite : public ITableFunction +{ +public: + static constexpr auto name = "sqlite"; + std::string getName() const override { return name; } + +private: + StoragePtr executeImpl( + const ASTPtr & ast_function, ContextPtr context, + const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "SQLite"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + String database_path, remote_table_name; + std::shared_ptr<sqlite3> sqlite_db; +}; + +} + +#endif diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionURL.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionURL.cpp new file mode 100644 index 0000000000..8d5a023fc3 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionURL.cpp @@ -0,0 +1,146 @@ +#include <TableFunctions/TableFunctionURL.h> + +#include "registerTableFunctions.h" +#include <Access/Common/AccessFlags.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTIdentifier.h> +#include <Storages/ColumnsDescription.h> +#include <Storages/StorageExternalDistributed.h> +#include <Storages/NamedCollectionsHelpers.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Analyzer/FunctionNode.h> +#include <Analyzer/TableFunctionNode.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Formats/FormatFactory.h> + +#include <IO/WriteHelpers.h> +#include <IO/WriteBufferFromVector.h> +namespace DB +{ + +std::vector<size_t> TableFunctionURL::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const +{ + auto & table_function_node = query_node_table_function->as<TableFunctionNode &>(); + auto & table_function_arguments_nodes = table_function_node.getArguments().getNodes(); + size_t table_function_arguments_size = table_function_arguments_nodes.size(); + + std::vector<size_t> result; + + for (size_t i = 0; i < table_function_arguments_size; ++i) + { + auto * function_node = table_function_arguments_nodes[i]->as<FunctionNode>(); + if (function_node && function_node->getFunctionName() == "headers") + result.push_back(i); + } + + return result; +} + +void TableFunctionURL::parseArguments(const ASTPtr & ast, ContextPtr context) +{ + /// Clone ast function, because we can modify it's arguments like removing headers. + ITableFunctionFileLike::parseArguments(ast->clone(), context); +} + +void TableFunctionURL::parseArgumentsImpl(ASTs & args, const ContextPtr & context) +{ + if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context)) + { + StorageURL::processNamedCollectionResult(configuration, *named_collection); + + filename = configuration.url; + structure = configuration.structure; + compression_method = configuration.compression_method; + + format = configuration.format; + if (format == "auto") + format = FormatFactory::instance().getFormatFromFileName(Poco::URI(filename).getPath(), true); + + StorageURL::collectHeaders(args, configuration.headers, context); + } + else + { + auto * headers_it = StorageURL::collectHeaders(args, configuration.headers, context); + /// ITableFunctionFileLike cannot parse headers argument, so remove it. + if (headers_it != args.end()) + args.erase(headers_it); + + ITableFunctionFileLike::parseArgumentsImpl(args, context); + } +} + +void TableFunctionURL::addColumnsStructureToArguments(ASTs & args, const String & desired_structure, const ContextPtr & context) +{ + if (tryGetNamedCollectionWithOverrides(args, context)) + { + /// In case of named collection, just add key-value pair "structure='...'" + /// at the end of arguments to override existed structure. + ASTs equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(desired_structure)}; + auto equal_func = makeASTFunction("equals", std::move(equal_func_args)); + args.push_back(equal_func); + } + else + { + /// If arguments contain headers, just remove it and add to the end of arguments later + /// (header argument can be at any position). + HTTPHeaderEntries tmp_headers; + auto * headers_it = StorageURL::collectHeaders(args, tmp_headers, context); + ASTPtr headers_ast; + if (headers_it != args.end()) + { + headers_ast = *headers_it; + args.erase(headers_it); + } + + ITableFunctionFileLike::addColumnsStructureToArguments(args, desired_structure, context); + + if (headers_ast) + args.push_back(headers_ast); + } +} + +StoragePtr TableFunctionURL::getStorage( + const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, + const std::string & table_name, const String & compression_method_) const +{ + return std::make_shared<StorageURL>( + source, + StorageID(getDatabaseName(), table_name), + format_, + std::nullopt /*format settings*/, + columns, + ConstraintsDescription{}, + String{}, + global_context, + compression_method_, + configuration.headers, + configuration.http_method); +} + +ColumnsDescription TableFunctionURL::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + if (structure == "auto") + { + context->checkAccess(getSourceAccessType()); + return StorageURL::getTableStructureFromData(format, + filename, + chooseCompressionMethod(Poco::URI(filename).getPath(), compression_method), + configuration.headers, + std::nullopt, + context); + } + + return parseColumnsListFromString(structure, context); +} + +String TableFunctionURL::getFormatFromFirstArgument() +{ + return FormatFactory::instance().getFormatFromFileName(Poco::URI(filename).getPath(), true); +} + +void registerTableFunctionURL(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionURL>(); +} +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionURL.h b/contrib/clickhouse/src/TableFunctions/TableFunctionURL.h new file mode 100644 index 0000000000..5e58a36dde --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionURL.h @@ -0,0 +1,58 @@ +#pragma once + +#include <TableFunctions/ITableFunctionFileLike.h> +#include <Storages/StorageURL.h> +#include <IO/ReadWriteBufferFromHTTP.h> + + +namespace DB +{ + +class Context; + +/* url(source, [format, structure, compression]) - creates a temporary storage from url. + */ +class TableFunctionURL : public ITableFunctionFileLike +{ +public: + static constexpr auto name = "url"; + static constexpr auto signature = " - uri\n" + " - uri, format\n" + " - uri, format, structure\n" + " - uri, format, structure, compression_method\n" + "All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)"; + + String getName() const override + { + return name; + } + + String getSignature() const override + { + return signature; + } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + static void addColumnsStructureToArguments(ASTs & args, const String & desired_structure, const ContextPtr & context); + +protected: + void parseArguments(const ASTPtr & ast, ContextPtr context) override; + void parseArgumentsImpl(ASTs & args, const ContextPtr & context) override; + + StorageURL::Configuration configuration; + +private: + std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; + + StoragePtr getStorage( + const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, + const std::string & table_name, const String & compression_method_) const override; + + const char * getStorageTypeName() const override { return "URL"; } + + String getFormatFromFirstArgument() override; + +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionURLCluster.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionURLCluster.cpp new file mode 100644 index 0000000000..a294927815 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionURLCluster.cpp @@ -0,0 +1,54 @@ +#include <TableFunctions/TableFunctionURLCluster.h> +#include <TableFunctions/TableFunctionFactory.h> + +#include "registerTableFunctions.h" + +namespace DB +{ + +StoragePtr TableFunctionURLCluster::getStorage( + const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context, + const std::string & table_name, const String & /*compression_method_*/) const +{ + StoragePtr storage; + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + //On worker node this uri won't contain globs + storage = std::make_shared<StorageURL>( + filename, + StorageID(getDatabaseName(), table_name), + format, + std::nullopt /*format settings*/, + columns, + ConstraintsDescription{}, + String{}, + context, + compression_method, + configuration.headers, + configuration.http_method, + nullptr, + /*distributed_processing=*/ true); + } + else + { + storage = std::make_shared<StorageURLCluster>( + context, + cluster_name, + filename, + format, + compression_method, + StorageID(getDatabaseName(), table_name), + getActualTableStructure(context, /* is_insert_query */ true), + ConstraintsDescription{}, + configuration, + structure != "auto"); + } + return storage; +} + +void registerTableFunctionURLCluster(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionURLCluster>(); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionURLCluster.h b/contrib/clickhouse/src/TableFunctions/TableFunctionURLCluster.h new file mode 100644 index 0000000000..be6992fcaa --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionURLCluster.h @@ -0,0 +1,52 @@ +#pragma once + +#include <TableFunctions/ITableFunctionFileLike.h> +#include <TableFunctions/TableFunctionURL.h> +#include <TableFunctions/ITableFunctionCluster.h> +#include <Storages/StorageURL.h> +#include <Storages/StorageURLCluster.h> +#include <IO/ReadWriteBufferFromHTTP.h> + + +namespace DB +{ + +class Context; + +/** + * urlCluster(cluster, URI, format, structure, compression_method) + * A table function, which allows to process many files from url on a specific cluster + * On initiator it creates a connection to _all_ nodes in cluster, discloses asterics + * in url file path and dispatch each file dynamically. + * On worker node it asks initiator about next task to process, processes it. + * This is repeated until the tasks are finished. + */ +class TableFunctionURLCluster : public ITableFunctionCluster<TableFunctionURL> +{ +public: + static constexpr auto name = "urlCluster"; + static constexpr auto signature = " - cluster, uri\n" + " - cluster, uri, format\n" + " - cluster, uri, format, structure\n" + " - cluster, uri, format, structure, compression_method\n" + "All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)"; + + String getName() const override + { + return name; + } + + String getSignature() const override + { + return signature; + } + +protected: + StoragePtr getStorage( + const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, + const std::string & table_name, const String & compression_method_) const override; + + const char * getStorageTypeName() const override { return "URLCluster"; } +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionValues.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionValues.cpp new file mode 100644 index 0000000000..42a1987470 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionValues.cpp @@ -0,0 +1,154 @@ +#include <Common/typeid_cast.h> +#include <Common/Exception.h> + +#include <Core/Block.h> +#include <Storages/StorageValues.h> +#include <DataTypes/DataTypeTuple.h> +#include <DataTypes/getLeastSupertype.h> + +#include <Parsers/ASTExpressionList.h> +#include <Parsers/ASTLiteral.h> + +#include <TableFunctions/TableFunctionValues.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Interpreters/parseColumnsListForTableFunction.h> + +#include <Interpreters/convertFieldToType.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/Context.h> +#include "registerTableFunctions.h" + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; +} + +static void parseAndInsertValues(MutableColumns & res_columns, const ASTs & args, const Block & sample_block, size_t start, ContextPtr context) +{ + if (res_columns.size() == 1) /// Parsing arguments as Fields + { + for (size_t i = start; i < args.size(); ++i) + { + const auto & [value_field, value_type_ptr] = evaluateConstantExpression(args[i], context); + + Field value = convertFieldToTypeOrThrow(value_field, *sample_block.getByPosition(0).type, value_type_ptr.get()); + res_columns[0]->insert(value); + } + } + else /// Parsing arguments as Tuples + { + for (size_t i = start; i < args.size(); ++i) + { + const auto & [value_field, value_type_ptr] = evaluateConstantExpression(args[i], context); + + const DataTypeTuple * type_tuple = typeid_cast<const DataTypeTuple *>(value_type_ptr.get()); + if (!type_tuple) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Table function VALUES requires all but first argument (rows specification) to be either tuples or single values"); + + const Tuple & value_tuple = value_field.safeGet<Tuple>(); + + if (value_tuple.size() != sample_block.columns()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Values size should match with number of columns"); + + const DataTypes & value_types_tuple = type_tuple->getElements(); + for (size_t j = 0; j < value_tuple.size(); ++j) + { + Field value = convertFieldToTypeOrThrow(value_tuple[j], *sample_block.getByPosition(j).type, value_types_tuple[j].get()); + res_columns[j]->insert(value); + } + } + } +} + +DataTypes TableFunctionValues::getTypesFromArgument(const ASTPtr & arg, ContextPtr context) +{ + const auto & [value_field, value_type_ptr] = evaluateConstantExpression(arg, context); + DataTypes types; + if (const DataTypeTuple * type_tuple = typeid_cast<const DataTypeTuple *>(value_type_ptr.get())) + return type_tuple->getElements(); + + return {value_type_ptr}; +} + +void TableFunctionValues::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments", getName()); + + ASTs & args = args_func.at(0)->children; + + if (args.empty()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires at least 1 argument", getName()); + + const auto & literal = args[0]->as<const ASTLiteral>(); + String value; + String error; + if (args.size() > 1 && literal && literal->value.tryGet(value) && tryParseColumnsListFromString(value, structure, context, error)) + { + has_structure_in_arguments = true; + return; + } + + has_structure_in_arguments = false; + DataTypes data_types = getTypesFromArgument(args[0], context); + for (size_t i = 1; i < args.size(); ++i) + { + auto arg_types = getTypesFromArgument(args[i], context); + if (data_types.size() != arg_types.size()) + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "Cannot determine common structure for {} function arguments: the amount of columns is differ for different arguments", + getName()); + for (size_t j = 0; j != arg_types.size(); ++j) + data_types[j] = getLeastSupertype(DataTypes{data_types[j], arg_types[j]}); + } + + NamesAndTypesList names_and_types; + for (size_t i = 0; i != data_types.size(); ++i) + names_and_types.emplace_back("c" + std::to_string(i + 1), data_types[i]); + structure = ColumnsDescription(names_and_types); +} + +ColumnsDescription TableFunctionValues::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const +{ + return structure; +} + +StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto columns = getActualTableStructure(context, is_insert_query); + + Block sample_block; + for (const auto & name_type : columns.getOrdinary()) + sample_block.insert({ name_type.type->createColumn(), name_type.type, name_type.name }); + + MutableColumns res_columns = sample_block.cloneEmptyColumns(); + + ASTs & args = ast_function->children.at(0)->children; + + /// Parsing other arguments as values and inserting them into columns + parseAndInsertValues(res_columns, args, sample_block, has_structure_in_arguments ? 1 : 0, context); + + Block res_block = sample_block.cloneWithColumns(std::move(res_columns)); + + auto res = std::make_shared<StorageValues>(StorageID(getDatabaseName(), table_name), columns, res_block); + res->startup(); + return res; +} + +void registerTableFunctionValues(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionValues>({.documentation = {}, .allow_readonly = true}, TableFunctionFactory::CaseInsensitive); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionValues.h b/contrib/clickhouse/src/TableFunctions/TableFunctionValues.h new file mode 100644 index 0000000000..7c87bff835 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionValues.h @@ -0,0 +1,30 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> + +namespace DB +{ +/* values(structure, values...) - creates a temporary storage filling columns with values + * values is case-insensitive table function. + */ +class TableFunctionValues : public ITableFunction +{ +public: + static constexpr auto name = "values"; + std::string getName() const override { return name; } + bool hasStaticStructure() const override { return true; } +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "Values"; } + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + static DataTypes getTypesFromArgument(const ASTPtr & arg, ContextPtr context); + + ColumnsDescription structure; + bool has_structure_in_arguments; +}; + + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionView.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionView.cpp new file mode 100644 index 0000000000..2a50fb2d00 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionView.cpp @@ -0,0 +1,74 @@ +#include <Interpreters/InterpreterSelectWithUnionQuery.h> +#include <Interpreters/InterpreterSelectQueryAnalyzer.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTSelectWithUnionQuery.h> +#include <Storages/StorageView.h> +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionView.h> +#include "registerTableFunctions.h" + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + + +const ASTSelectWithUnionQuery & TableFunctionView::getSelectQuery() const +{ + return *create.select; +} + +std::vector<size_t> TableFunctionView::skipAnalysisForArguments(const QueryTreeNodePtr &, ContextPtr) const +{ + return {0}; +} + +void TableFunctionView::parseArguments(const ASTPtr & ast_function, ContextPtr /*context*/) +{ + const auto * function = ast_function->as<ASTFunction>(); + if (function) + { + if (auto * select = function->tryGetQueryArgument()) + { + create.set(create.select, select->clone()); + return; + } + } + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' requires a query argument.", getName()); +} + +ColumnsDescription TableFunctionView::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const +{ + assert(create.select); + assert(create.children.size() == 1); + assert(create.children[0]->as<ASTSelectWithUnionQuery>()); + + Block sample_block; + + if (context->getSettingsRef().allow_experimental_analyzer) + sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(create.children[0], context); + else + sample_block = InterpreterSelectWithUnionQuery::getSampleBlock(create.children[0], context); + + return ColumnsDescription(sample_block.getNamesAndTypesList()); +} + +StoragePtr TableFunctionView::executeImpl( + const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const +{ + auto columns = getActualTableStructure(context, is_insert_query); + auto res = std::make_shared<StorageView>(StorageID(getDatabaseName(), table_name), create, columns, ""); + res->startup(); + return res; +} + +void registerTableFunctionView(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionView>({.documentation = {}, .allow_readonly = true}); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionView.h b/contrib/clickhouse/src/TableFunctions/TableFunctionView.h new file mode 100644 index 0000000000..c679a1f315 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionView.h @@ -0,0 +1,38 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> +#include <Parsers/ASTCreateQuery.h> +#include <base/types.h> + +namespace DB +{ + +/* view(query) + * Turning subquery into a table. + * Useful for passing subquery around. + */ +class TableFunctionView : public ITableFunction +{ +public: + static constexpr auto name = "view"; + + std::string getName() const override { return name; } + + const ASTSelectWithUnionQuery & getSelectQuery() const; + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "View"; } + + std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + ASTCreateQuery create; +}; + + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionViewIfPermitted.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionViewIfPermitted.cpp new file mode 100644 index 0000000000..d7944df1b2 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionViewIfPermitted.cpp @@ -0,0 +1,126 @@ +#include <Interpreters/InterpreterSelectWithUnionQuery.h> +#include <Interpreters/InterpreterSelectQueryAnalyzer.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/ASTSelectWithUnionQuery.h> +#include <Storages/StorageNull.h> +#include <Storages/StorageView.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <TableFunctions/TableFunctionViewIfPermitted.h> +#include <Interpreters/parseColumnsListForTableFunction.h> +#include <Interpreters/evaluateConstantExpression.h> +#include "registerTableFunctions.h" + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; + extern const int ACCESS_DENIED; +} + + +const ASTSelectWithUnionQuery & TableFunctionViewIfPermitted::getSelectQuery() const +{ + return *create.select; +} + +std::vector<size_t> TableFunctionViewIfPermitted::skipAnalysisForArguments(const QueryTreeNodePtr &, ContextPtr) const +{ + return {0}; +} + +void TableFunctionViewIfPermitted::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + const auto * function = ast_function->as<ASTFunction>(); + if (!function || !function->arguments || (function->arguments->children.size() != 2)) + throw Exception( + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function '{}' requires two arguments: a SELECT query and a table function", + getName()); + + const auto & arguments = function->arguments->children; + auto * select = arguments[0]->as<ASTSelectWithUnionQuery>(); + if (!select) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' requires a SELECT query as its first argument", getName()); + create.set(create.select, select->clone()); + + else_ast = arguments[1]; + if (!else_ast->as<ASTFunction>()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' requires a table function as its second argument", getName()); + else_table_function = TableFunctionFactory::instance().get(else_ast, context); +} + +ColumnsDescription TableFunctionViewIfPermitted::getActualTableStructure(ContextPtr context, bool is_insert_query) const +{ + return else_table_function->getActualTableStructure(context, is_insert_query); +} + +StoragePtr TableFunctionViewIfPermitted::executeImpl( + const ASTPtr & /* ast_function */, ContextPtr context, const std::string & table_name, ColumnsDescription /* cached_columns */, bool is_insert_query) const +{ + StoragePtr storage; + auto columns = getActualTableStructure(context, is_insert_query); + + if (isPermitted(context, columns)) + { + storage = std::make_shared<StorageView>(StorageID(getDatabaseName(), table_name), create, columns, ""); + } + else + { + storage = else_table_function->execute(else_ast, context, table_name); + } + + storage->startup(); + return storage; +} + +bool TableFunctionViewIfPermitted::isPermitted(const ContextPtr & context, const ColumnsDescription & else_columns) const +{ + Block sample_block; + + try + { + if (context->getSettingsRef().allow_experimental_analyzer) + { + sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(create.children[0], context); + } + else + { + /// Will throw ACCESS_DENIED if the current user is not allowed to execute the SELECT query. + sample_block = InterpreterSelectWithUnionQuery::getSampleBlock(create.children[0], context); + } + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::ACCESS_DENIED) + return false; + throw; + } + + /// We check that columns match only if permitted (otherwise we could reveal the structure to an user who must not know it). + ColumnsDescription columns{sample_block.getNamesAndTypesList()}; + if (columns != else_columns) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Table function '{}' requires a SELECT query with the result columns matching a table function after 'ELSE'. " + "Currently the result columns of the SELECT query are {}, and the table function after 'ELSE' gives {}", + getName(), + columns.toString(), + else_columns.toString()); + } + + return true; +} + +void registerTableFunctionViewIfPermitted(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionViewIfPermitted>({.documentation = {}, .allow_readonly = true}); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionViewIfPermitted.h b/contrib/clickhouse/src/TableFunctions/TableFunctionViewIfPermitted.h new file mode 100644 index 0000000000..bee4e15bfa --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionViewIfPermitted.h @@ -0,0 +1,40 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> +#include <Parsers/ASTCreateQuery.h> +#include <base/types.h> + +namespace DB +{ + +/* viewIfPermitted(query ELSE null('structure')) + * Works as "view(query)" if the current user has the permissions required to execute "query"; works as "null('structure')" otherwise. + */ +class TableFunctionViewIfPermitted : public ITableFunction +{ +public: + static constexpr auto name = "viewIfPermitted"; + + std::string getName() const override { return name; } + + const ASTSelectWithUnionQuery & getSelectQuery() const; + +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + + const char * getStorageTypeName() const override { return "ViewIfPermitted"; } + + std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; + + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; + + bool isPermitted(const ContextPtr & context, const ColumnsDescription & else_columns) const; + + ASTCreateQuery create; + ASTPtr else_ast; + TableFunctionPtr else_table_function; +}; + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionZeros.cpp b/contrib/clickhouse/src/TableFunctions/TableFunctionZeros.cpp new file mode 100644 index 0000000000..eb93626590 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionZeros.cpp @@ -0,0 +1,83 @@ +#include <TableFunctions/ITableFunction.h> +#include <TableFunctions/TableFunctionZeros.h> +#include <TableFunctions/TableFunctionFactory.h> +#include <Parsers/ASTFunction.h> +#include <Storages/checkAndGetLiteralArgument.h> +#include <Storages/System/StorageSystemZeros.h> +#include <DataTypes/DataTypesNumber.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Interpreters/Context.h> +#include "registerTableFunctions.h" + + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + + +template <bool multithreaded> +ColumnsDescription TableFunctionZeros<multithreaded>::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const +{ + /// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418 + return ColumnsDescription{{{"zero", std::make_shared<DataTypeUInt8>()}}}; +} + +template <bool multithreaded> +StoragePtr TableFunctionZeros<multithreaded>::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const +{ + if (const auto * function = ast_function->as<ASTFunction>()) + { + auto arguments = function->arguments->children; + + if (arguments.size() != 1) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length'.", getName()); + + + UInt64 length = evaluateArgument(context, arguments[0]); + + auto res = std::make_shared<StorageSystemZeros>(StorageID(getDatabaseName(), table_name), multithreaded, length); + res->startup(); + return res; + } + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'limit'.", getName()); +} + +void registerTableFunctionZeros(TableFunctionFactory & factory) +{ + factory.registerFunction<TableFunctionZeros<true>>({.documentation = { + .description=R"( + Generates a stream of zeros (a table with one column 'zero' of type 'UInt8') of specified size. + This table function is used in performance tests, where you want to spend as little time as possible to data generation while testing some other parts of queries. + In contrast to the `zeros_mt`, this table function is using single thread for data generation. + Example: + [example:1] + This query will test the speed of `randomPrintableASCII` function using single thread. + See also the `system.zeros` table.)", + .examples={{"1", "SELECT count() FROM zeros(100000000) WHERE NOT ignore(randomPrintableASCII(10))", ""}} + }}); + + factory.registerFunction<TableFunctionZeros<false>>({.documentation = { + .description=R"( + Generates a stream of zeros (a table with one column 'zero' of type 'UInt8') of specified size. + This table function is used in performance tests, where you want to spend as little time as possible to data generation while testing some other parts of queries. + In contrast to the `zeros`, this table function is using multiple threads for data generation, according to the `max_threads` setting. + Example: + [example:1] + This query will test the speed of `randomPrintableASCII` function using multiple threads. + See also the `system.zeros` table. + )", + .examples={{"1", "SELECT count() FROM zeros_mt(1000000000) WHERE NOT ignore(randomPrintableASCII(10))", ""}} +}}); +} + +template <bool multithreaded> +UInt64 TableFunctionZeros<multithreaded>::evaluateArgument(ContextPtr context, ASTPtr & argument) const +{ + return checkAndGetLiteralArgument<UInt64>(evaluateConstantExpressionOrIdentifierAsLiteral(argument, context), "length"); +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/TableFunctionZeros.h b/contrib/clickhouse/src/TableFunctions/TableFunctionZeros.h new file mode 100644 index 0000000000..07d523ee37 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/TableFunctionZeros.h @@ -0,0 +1,31 @@ +#pragma once + +#include <TableFunctions/ITableFunction.h> +#include <base/types.h> + + +namespace DB +{ + +/* zeros(limit), zeros_mt(limit) + * - the same as SELECT zero FROM system.zeros LIMIT limit. + * Used for testing purposes, as a simple example of table function. + */ +template <bool multithreaded> +class TableFunctionZeros : public ITableFunction +{ +public: + static constexpr auto name = multithreaded ? "zeros_mt" : "zeros"; + std::string getName() const override { return name; } + bool hasStaticStructure() const override { return true; } +private: + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; + const char * getStorageTypeName() const override { return "SystemZeros"; } + + UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const; + + ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; +}; + + +} diff --git a/contrib/clickhouse/src/TableFunctions/registerTableFunctions.cpp b/contrib/clickhouse/src/TableFunctions/registerTableFunctions.cpp new file mode 100644 index 0000000000..9383b6ac93 --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/registerTableFunctions.cpp @@ -0,0 +1,84 @@ +#include "registerTableFunctions.h" +#include <TableFunctions/TableFunctionFactory.h> + + +namespace DB +{ +void registerTableFunctions() +{ + auto & factory = TableFunctionFactory::instance(); + + registerTableFunctionMerge(factory); + registerTableFunctionRemote(factory); + registerTableFunctionNumbers(factory); + registerTableFunctionNull(factory); + registerTableFunctionZeros(factory); + registerTableFunctionExecutable(factory); + registerTableFunctionFile(factory); + registerTableFunctionURL(factory); + registerTableFunctionURLCluster(factory); + registerTableFunctionValues(factory); + registerTableFunctionInput(factory); + registerTableFunctionGenerate(factory); + // registerTableFunctionMongoDB(factory); + registerTableFunctionRedis(factory); + + registerTableFunctionMeiliSearch(factory); + +#if USE_AWS_S3 + registerTableFunctionS3(factory); + registerTableFunctionS3Cluster(factory); + registerTableFunctionCOS(factory); + registerTableFunctionOSS(factory); + registerTableFunctionGCS(factory); + registerTableFunctionHudi(factory); +#if USE_PARQUET + registerTableFunctionDeltaLake(factory); +#endif +#if USE_AVRO + registerTableFunctionIceberg(factory); +#endif + +#endif + +#if USE_HDFS + registerTableFunctionHDFS(factory); + registerTableFunctionHDFSCluster(factory); +#endif + +#if USE_HIVE + registerTableFunctionHive(factory); +#endif + + registerTableFunctionODBC(factory); + registerTableFunctionJDBC(factory); + + registerTableFunctionView(factory); + registerTableFunctionViewIfPermitted(factory); + +#if USE_MYSQL + registerTableFunctionMySQL(factory); +#endif + +#if USE_LIBPQXX + registerTableFunctionPostgreSQL(factory); +#endif + +#if USE_SQLITE + registerTableFunctionSQLite(factory); +#endif + + registerTableFunctionDictionary(factory); + + registerTableFunctionFormat(factory); + registerTableFunctionExplain(factory); + +#if USE_AZURE_BLOB_STORAGE + registerTableFunctionAzureBlobStorage(factory); + registerTableFunctionAzureBlobStorageCluster(factory); +#endif + + +} + +} diff --git a/contrib/clickhouse/src/TableFunctions/registerTableFunctions.h b/contrib/clickhouse/src/TableFunctions/registerTableFunctions.h new file mode 100644 index 0000000000..2bbc11c99d --- /dev/null +++ b/contrib/clickhouse/src/TableFunctions/registerTableFunctions.h @@ -0,0 +1,80 @@ +#pragma once + +#include "clickhouse_config.h" + +namespace DB +{ +class TableFunctionFactory; +void registerTableFunctionMerge(TableFunctionFactory & factory); +void registerTableFunctionRemote(TableFunctionFactory & factory); +void registerTableFunctionNumbers(TableFunctionFactory & factory); +void registerTableFunctionNull(TableFunctionFactory & factory); +void registerTableFunctionZeros(TableFunctionFactory & factory); +void registerTableFunctionExecutable(TableFunctionFactory & factory); +void registerTableFunctionFile(TableFunctionFactory & factory); +void registerTableFunctionURL(TableFunctionFactory & factory); +void registerTableFunctionURLCluster(TableFunctionFactory & factory); +void registerTableFunctionValues(TableFunctionFactory & factory); +void registerTableFunctionInput(TableFunctionFactory & factory); +void registerTableFunctionGenerate(TableFunctionFactory & factory); +void registerTableFunctionMongoDB(TableFunctionFactory & factory); +void registerTableFunctionRedis(TableFunctionFactory & factory); + +void registerTableFunctionMeiliSearch(TableFunctionFactory & factory); + +#if USE_AWS_S3 +void registerTableFunctionS3(TableFunctionFactory & factory); +void registerTableFunctionS3Cluster(TableFunctionFactory & factory); +void registerTableFunctionCOS(TableFunctionFactory & factory); +void registerTableFunctionOSS(TableFunctionFactory & factory); +void registerTableFunctionGCS(TableFunctionFactory & factory); +void registerTableFunctionHudi(TableFunctionFactory & factory); +#if USE_PARQUET +void registerTableFunctionDeltaLake(TableFunctionFactory & factory); +#endif +#if USE_AVRO +void registerTableFunctionIceberg(TableFunctionFactory & factory); +#endif +#endif + +#if USE_HDFS +void registerTableFunctionHDFS(TableFunctionFactory & factory); +void registerTableFunctionHDFSCluster(TableFunctionFactory & factory); +#endif + +#if USE_HIVE +void registerTableFunctionHive(TableFunctionFactory & factory); +#endif + +void registerTableFunctionODBC(TableFunctionFactory & factory); +void registerTableFunctionJDBC(TableFunctionFactory & factory); + +void registerTableFunctionView(TableFunctionFactory & factory); +void registerTableFunctionViewIfPermitted(TableFunctionFactory & factory); + +#if USE_MYSQL +void registerTableFunctionMySQL(TableFunctionFactory & factory); +#endif + +#if USE_LIBPQXX +void registerTableFunctionPostgreSQL(TableFunctionFactory & factory); +#endif + +#if USE_SQLITE +void registerTableFunctionSQLite(TableFunctionFactory & factory); +#endif + +void registerTableFunctionDictionary(TableFunctionFactory & factory); + +void registerTableFunctionFormat(TableFunctionFactory & factory); + +void registerTableFunctionExplain(TableFunctionFactory & factory); + +#if USE_AZURE_BLOB_STORAGE +void registerTableFunctionAzureBlobStorage(TableFunctionFactory & factory); +void registerTableFunctionAzureBlobStorageCluster(TableFunctionFactory & factory); +#endif + +void registerTableFunctions(); + +} |