summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp')
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp257
1 files changed, 257 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp b/contrib/clickhouse/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp
new file mode 100644
index 00000000000..ca142479ff1
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp
@@ -0,0 +1,257 @@
+#include "ExternalUserDefinedExecutableFunctionsLoader.h"
+
+#include <boost/algorithm/string/split.hpp>
+#include <Common/StringUtils/StringUtils.h>
+
+#include <DataTypes/DataTypeFactory.h>
+
+#include <Functions/UserDefined/UserDefinedExecutableFunction.h>
+#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
+#include <Functions/FunctionFactory.h>
+#include <AggregateFunctions/AggregateFunctionFactory.h>
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int BAD_ARGUMENTS;
+ extern const int FUNCTION_ALREADY_EXISTS;
+ extern const int UNSUPPORTED_METHOD;
+ extern const int TYPE_MISMATCH;
+}
+
+namespace
+{
+ /** Extract parameters from command and replace them with parameter names placeholders.
+ * Example: test_script.py {parameter_name: UInt64}
+ * After run function: test_script.py {parameter_name}
+ */
+ std::vector<UserDefinedExecutableFunctionParameter> extractParametersFromCommand(String & command_value)
+ {
+ std::vector<UserDefinedExecutableFunctionParameter> parameters;
+ std::unordered_map<std::string_view, DataTypePtr> parameter_name_to_type;
+
+ size_t previous_parameter_match_position = 0;
+ while (true)
+ {
+ auto start_parameter_pos = command_value.find('{', previous_parameter_match_position);
+ if (start_parameter_pos == std::string::npos)
+ break;
+
+ auto end_parameter_pos = command_value.find('}', start_parameter_pos);
+ if (end_parameter_pos == std::string::npos)
+ break;
+
+ previous_parameter_match_position = start_parameter_pos + 1;
+
+ auto semicolon_pos = command_value.find(':', start_parameter_pos);
+ if (semicolon_pos == std::string::npos)
+ break;
+ else if (semicolon_pos > end_parameter_pos)
+ continue;
+
+ std::string parameter_name(command_value.data() + start_parameter_pos + 1, command_value.data() + semicolon_pos);
+ trim(parameter_name);
+
+ bool is_identifier = std::all_of(parameter_name.begin(), parameter_name.end(), [](char character)
+ {
+ return isWordCharASCII(character);
+ });
+
+ if (parameter_name.empty() && !is_identifier)
+ continue;
+
+ std::string data_type_name(command_value.data() + semicolon_pos + 1, command_value.data() + end_parameter_pos);
+ trim(data_type_name);
+
+ if (data_type_name.empty())
+ continue;
+
+ DataTypePtr parameter_data_type = DataTypeFactory::instance().get(data_type_name);
+ auto parameter_name_to_type_it = parameter_name_to_type.find(parameter_name);
+ if (parameter_name_to_type_it != parameter_name_to_type.end() && !parameter_data_type->equals(*parameter_name_to_type_it->second))
+ throw Exception(ErrorCodes::TYPE_MISMATCH,
+ "Multiple parameters with same name {} does not have same type. Expected {}. Actual {}",
+ parameter_name,
+ parameter_name_to_type_it->second->getName(),
+ parameter_data_type->getName());
+
+ size_t replace_size = end_parameter_pos - start_parameter_pos - 1;
+ command_value.replace(start_parameter_pos + 1, replace_size, parameter_name);
+ previous_parameter_match_position = start_parameter_pos + parameter_name.size();
+
+ if (parameter_name_to_type_it == parameter_name_to_type.end())
+ {
+ parameters.emplace_back(UserDefinedExecutableFunctionParameter{std::move(parameter_name), std::move(parameter_data_type)});
+ auto & last_parameter = parameters.back();
+ parameter_name_to_type.emplace(last_parameter.name, last_parameter.type);
+ }
+ }
+
+ return parameters;
+ }
+}
+
+ExternalUserDefinedExecutableFunctionsLoader::ExternalUserDefinedExecutableFunctionsLoader(ContextPtr global_context_)
+ : ExternalLoader("external user defined function", &Poco::Logger::get("ExternalUserDefinedExecutableFunctionsLoader"))
+ , WithContext(global_context_)
+{
+ setConfigSettings({"function", "name", "database", "uuid"});
+ enableAsyncLoading(false);
+ enablePeriodicUpdates(true);
+ enableAlwaysLoadEverything(true);
+}
+
+ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr ExternalUserDefinedExecutableFunctionsLoader::getUserDefinedFunction(const std::string & user_defined_function_name) const
+{
+ return std::static_pointer_cast<const UserDefinedExecutableFunction>(load(user_defined_function_name));
+}
+
+ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr ExternalUserDefinedExecutableFunctionsLoader::tryGetUserDefinedFunction(const std::string & user_defined_function_name) const
+{
+ return std::static_pointer_cast<const UserDefinedExecutableFunction>(tryLoad(user_defined_function_name));
+}
+
+void ExternalUserDefinedExecutableFunctionsLoader::reloadFunction(const std::string & user_defined_function_name) const
+{
+ loadOrReload(user_defined_function_name);
+}
+
+ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create(const std::string & name,
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & key_in_config,
+ const std::string &) const
+{
+ if (FunctionFactory::instance().hasNameOrAlias(name))
+ throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The function '{}' already exists", name);
+
+ if (AggregateFunctionFactory::instance().hasNameOrAlias(name))
+ throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", name);
+
+ String type = config.getString(key_in_config + ".type");
+
+ bool is_executable_pool = false;
+
+ if (type == "executable")
+ is_executable_pool = false;
+ else if (type == "executable_pool")
+ is_executable_pool = true;
+ else
+ throw Exception(ErrorCodes::BAD_ARGUMENTS,
+ "Wrong user defined function type expected 'executable' or 'executable_pool' actual {}",
+ type);
+
+ bool execute_direct = config.getBool(key_in_config + ".execute_direct", true);
+
+ String command_value = config.getString(key_in_config + ".command");
+ std::vector<UserDefinedExecutableFunctionParameter> parameters = extractParametersFromCommand(command_value);
+
+ if (!execute_direct && !parameters.empty())
+ throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Parameters are not supported if executable user defined function is not direct");
+
+ std::vector<String> command_arguments;
+
+ if (execute_direct)
+ {
+ boost::split(command_arguments, command_value, [](char c) { return c == ' '; });
+
+ command_value = std::move(command_arguments[0]);
+ command_arguments.erase(command_arguments.begin());
+ }
+
+ String format = config.getString(key_in_config + ".format");
+ DataTypePtr result_type = DataTypeFactory::instance().get(config.getString(key_in_config + ".return_type"));
+ String result_name = "result";
+ if (config.has(key_in_config + ".return_name"))
+ result_name = config.getString(key_in_config + ".return_name");
+
+ bool send_chunk_header = config.getBool(key_in_config + ".send_chunk_header", false);
+ size_t command_termination_timeout_seconds = config.getUInt64(key_in_config + ".command_termination_timeout", 10);
+ size_t command_read_timeout_milliseconds = config.getUInt64(key_in_config + ".command_read_timeout", 10000);
+ size_t command_write_timeout_milliseconds = config.getUInt64(key_in_config + ".command_write_timeout", 10000);
+ ExternalCommandStderrReaction stderr_reaction
+ = parseExternalCommandStderrReaction(config.getString(key_in_config + ".stderr_reaction", "none"));
+ bool check_exit_code = config.getBool(key_in_config + ".check_exit_code", true);
+
+ size_t pool_size = 0;
+ size_t max_command_execution_time = 0;
+
+ if (is_executable_pool)
+ {
+ pool_size = config.getUInt64(key_in_config + ".pool_size", 16);
+ max_command_execution_time = config.getUInt64(key_in_config + ".max_command_execution_time", 10);
+
+ size_t max_execution_time_seconds = static_cast<size_t>(getContext()->getSettings().max_execution_time.totalSeconds());
+ if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
+ max_command_execution_time = max_execution_time_seconds;
+ }
+
+ ExternalLoadableLifetime lifetime;
+
+ if (config.has(key_in_config + ".lifetime"))
+ lifetime = ExternalLoadableLifetime(config, key_in_config + ".lifetime");
+
+ std::vector<UserDefinedExecutableFunctionArgument> arguments;
+
+ Poco::Util::AbstractConfiguration::Keys config_elems;
+ config.keys(key_in_config, config_elems);
+
+ size_t argument_number = 1;
+
+ for (const auto & config_elem : config_elems)
+ {
+ if (!startsWith(config_elem, "argument"))
+ continue;
+
+ UserDefinedExecutableFunctionArgument argument;
+
+ const auto argument_prefix = key_in_config + '.' + config_elem + '.';
+
+ argument.type = DataTypeFactory::instance().get(config.getString(argument_prefix + "type"));
+
+ if (config.has(argument_prefix + "name"))
+ argument.name = config.getString(argument_prefix + "name");
+ else
+ argument.name = "c" + std::to_string(argument_number);
+
+ ++argument_number;
+ arguments.emplace_back(std::move(argument));
+ }
+
+ if (is_executable_pool && !parameters.empty())
+ throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
+ "Executable user defined functions with `executable_pool` type does not support parameters");
+
+ UserDefinedExecutableFunctionConfiguration function_configuration
+ {
+ .name = name,
+ .command = std::move(command_value),
+ .command_arguments = std::move(command_arguments),
+ .arguments = std::move(arguments),
+ .parameters = std::move(parameters),
+ .result_type = std::move(result_type),
+ .result_name = std::move(result_name),
+ };
+
+ ShellCommandSourceCoordinator::Configuration shell_command_coordinator_configration
+ {
+ .format = std::move(format),
+ .command_termination_timeout_seconds = command_termination_timeout_seconds,
+ .command_read_timeout_milliseconds = command_read_timeout_milliseconds,
+ .command_write_timeout_milliseconds = command_write_timeout_milliseconds,
+ .stderr_reaction = stderr_reaction,
+ .check_exit_code = check_exit_code,
+ .pool_size = pool_size,
+ .max_command_execution_time_seconds = max_command_execution_time,
+ .is_executable_pool = is_executable_pool,
+ .send_chunk_header = send_chunk_header,
+ .execute_direct = execute_direct
+ };
+
+ auto coordinator = std::make_shared<ShellCommandSourceCoordinator>(shell_command_coordinator_configration);
+ return std::make_shared<UserDefinedExecutableFunction>(function_configuration, std::move(coordinator), lifetime);
+}
+
+}