diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp')
| -rw-r--r-- | contrib/clickhouse/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp | 289 |
1 files changed, 289 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp new file mode 100644 index 00000000000..3f3cfc4c8e3 --- /dev/null +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp @@ -0,0 +1,289 @@ +#include "UserDefinedExecutableFunctionFactory.h" + +#include <filesystem> + +#include <Common/filesystemHelpers.h> +#include <Common/FieldVisitorToString.h> +#include <DataTypes/FieldToDataType.h> + +#include <Processors/Sources/ShellCommandSource.h> +#include <Processors/Sources/SourceFromSingleChunk.h> +#include <Formats/formatBlock.h> + +#include <Functions/FunctionFactory.h> +#include <Functions/FunctionHelpers.h> +#include <Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h> +#include <AggregateFunctions/AggregateFunctionFactory.h> +#include <Interpreters/convertFieldToType.h> +#include <Interpreters/Context.h> +#include <Interpreters/castColumn.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNSUPPORTED_METHOD; + extern const int BAD_ARGUMENTS; +} + +namespace +{ + +class UserDefinedFunction final : public IFunction +{ +public: + explicit UserDefinedFunction( + ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr executable_function_, + ContextPtr context_, + Array parameters_) + : executable_function(std::move(executable_function_)) + , context(std::move(context_)) + { + const auto & configuration = executable_function->getConfiguration(); + size_t command_parameters_size = configuration.parameters.size(); + if (command_parameters_size != parameters_.size()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Executable user defined function {} number of parameters does not match. Expected {}. Actual {}", + configuration.name, + command_parameters_size, + parameters_.size()); + + command_with_parameters = configuration.command; + command_arguments_with_parameters = configuration.command_arguments; + + for (size_t i = 0; i < command_parameters_size; ++i) + { + const auto & command_parameter = configuration.parameters[i]; + const auto & parameter_value = parameters_[i]; + auto converted_parameter = convertFieldToTypeOrThrow(parameter_value, *command_parameter.type); + auto parameter_placeholder = "{" + command_parameter.name + "}"; + + auto parameter_value_string = applyVisitor(FieldVisitorToString(), converted_parameter); + bool find_placedholder = false; + + auto try_replace_parameter_placeholder_with_value = [&](std::string & command_part) + { + size_t previous_parameter_placeholder_position = 0; + + while (true) + { + auto parameter_placeholder_position = command_part.find(parameter_placeholder, previous_parameter_placeholder_position); + if (parameter_placeholder_position == std::string::npos) + break; + + size_t parameter_placeholder_size = parameter_placeholder.size(); + command_part.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string); + previous_parameter_placeholder_position = parameter_placeholder_position + parameter_value_string.size(); + find_placedholder = true; + } + + find_placedholder = true; + }; + + for (auto & command_argument : command_arguments_with_parameters) + try_replace_parameter_placeholder_with_value(command_argument); + + try_replace_parameter_placeholder_with_value(command_with_parameters); + + if (!find_placedholder) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Executable user defined function {} no placeholder for parameter {}", + configuration.name, + command_parameter.name); + } + } + } + + String getName() const override { return executable_function->getConfiguration().name; } + + bool isVariadic() const override { return false; } + bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; } + size_t getNumberOfArguments() const override { return executable_function->getConfiguration().arguments.size(); } + + bool useDefaultImplementationForConstants() const override { return true; } + bool useDefaultImplementationForNulls() const override { return false; } + bool isDeterministic() const override { return false; } + bool isDeterministicInScopeOfQuery() const override { return false; } + + DataTypePtr getReturnTypeImpl(const DataTypes &) const override + { + const auto & configuration = executable_function->getConfiguration(); + return configuration.result_type; + } + + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override + { + /// Do not start user defined script during query analysis. Because user script startup could be heavy. + if (input_rows_count == 0) + return result_type->createColumn(); + + auto coordinator = executable_function->getCoordinator(); + const auto & coordinator_configuration = coordinator->getConfiguration(); + const auto & configuration = executable_function->getConfiguration(); + + String command = command_with_parameters; + + if (coordinator_configuration.execute_direct) + { + auto user_scripts_path = context->getUserScriptsPath(); + auto script_path = user_scripts_path + '/' + command; + + if (!fileOrSymlinkPathStartsWith(script_path, user_scripts_path)) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + "Executable file {} must be inside user scripts folder {}", + command, + user_scripts_path); + + if (!FS::exists(script_path)) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + "Executable file {} does not exist inside user scripts folder {}", + command, + user_scripts_path); + + if (!FS::canExecute(script_path)) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + "Executable file {} is not executable inside user scripts folder {}", + command, + user_scripts_path); + + command = std::move(script_path); + } + + size_t argument_size = arguments.size(); + auto arguments_copy = arguments; + + for (size_t i = 0; i < argument_size; ++i) + { + auto & column_with_type = arguments_copy[i]; + column_with_type.column = column_with_type.column->convertToFullColumnIfConst(); + + const auto & argument = configuration.arguments[i]; + column_with_type.name = argument.name; + + const auto & argument_type = argument.type; + + if (areTypesEqual(arguments_copy[i].type, argument_type)) + continue; + + ColumnWithTypeAndName column_to_cast = {column_with_type.column, column_with_type.type, column_with_type.name}; + column_with_type.column = castColumnAccurate(column_to_cast, argument_type); + column_with_type.type = argument_type; + + column_with_type = std::move(column_to_cast); + } + + ColumnWithTypeAndName result(result_type, configuration.result_name); + Block result_block({result}); + + Block arguments_block(arguments_copy); + auto source = std::make_shared<SourceFromSingleChunk>(std::move(arguments_block)); + auto shell_input_pipe = Pipe(std::move(source)); + + ShellCommandSourceConfiguration shell_command_source_configuration; + + if (coordinator_configuration.is_executable_pool) + { + shell_command_source_configuration.read_fixed_number_of_rows = true; + shell_command_source_configuration.number_of_rows_to_read = input_rows_count; + } + + Pipes shell_input_pipes; + shell_input_pipes.emplace_back(std::move(shell_input_pipe)); + + Pipe pipe = coordinator->createPipe( + command, + command_arguments_with_parameters, + std::move(shell_input_pipes), + result_block, + context, + shell_command_source_configuration); + + QueryPipeline pipeline(std::move(pipe)); + PullingPipelineExecutor executor(pipeline); + + auto result_column = result_type->createColumn(); + result_column->reserve(input_rows_count); + + Block block; + while (executor.pull(block)) + { + const auto & result_column_to_add = *block.safeGetByPosition(0).column; + result_column->insertRangeFrom(result_column_to_add, 0, result_column_to_add.size()); + } + + size_t result_column_size = result_column->size(); + if (result_column_size != input_rows_count) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + "Function {}: wrong result, expected {} row(s), actual {}", + quoteString(getName()), + input_rows_count, + result_column_size); + + return result_column; + } + +private: + ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr executable_function; + ContextPtr context; + String command_with_parameters; + std::vector<std::string> command_arguments_with_parameters; +}; + +} + +UserDefinedExecutableFunctionFactory & UserDefinedExecutableFunctionFactory::instance() +{ + static UserDefinedExecutableFunctionFactory result; + return result; +} + +FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::get(const String & function_name, ContextPtr context, Array parameters) +{ + const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader(); + auto executable_function = std::static_pointer_cast<const UserDefinedExecutableFunction>(loader.load(function_name)); + auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context), std::move(parameters)); + return std::make_unique<FunctionToOverloadResolverAdaptor>(std::move(function)); +} + +FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const String & function_name, ContextPtr context, Array parameters) +{ + const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader(); + auto load_result = loader.getLoadResult(function_name); + + if (load_result.object) + { + auto executable_function = std::static_pointer_cast<const UserDefinedExecutableFunction>(load_result.object); + auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context), std::move(parameters)); + return std::make_unique<FunctionToOverloadResolverAdaptor>(std::move(function)); + } + + return nullptr; +} + +bool UserDefinedExecutableFunctionFactory::has(const String & function_name, ContextPtr context) +{ + const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader(); + auto load_result = loader.getLoadResult(function_name); + + bool result = load_result.object != nullptr; + return result; +} + +std::vector<String> UserDefinedExecutableFunctionFactory::getRegisteredNames(ContextPtr context) +{ + const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader(); + auto loaded_objects = loader.getLoadedObjects(); + + std::vector<std::string> registered_names; + registered_names.reserve(loaded_objects.size()); + + for (auto & loaded_object : loaded_objects) + registered_names.emplace_back(loaded_object->getLoadableName()); + + return registered_names; +} + +} |
