summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Functions/FunctionsTimeWindow.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Functions/FunctionsTimeWindow.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Functions/FunctionsTimeWindow.cpp')
-rw-r--r--contrib/clickhouse/src/Functions/FunctionsTimeWindow.cpp680
1 files changed, 680 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Functions/FunctionsTimeWindow.cpp b/contrib/clickhouse/src/Functions/FunctionsTimeWindow.cpp
new file mode 100644
index 00000000000..231e8b6fa77
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/FunctionsTimeWindow.cpp
@@ -0,0 +1,680 @@
+#include <numeric>
+
+#include <Columns/ColumnsNumber.h>
+#include <Columns/ColumnsDateTime.h>
+#include <Columns/ColumnTuple.h>
+#include <DataTypes/DataTypeDate.h>
+#include <DataTypes/DataTypeDateTime.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <DataTypes/DataTypeTuple.h>
+#include <Functions/extractTimeZoneFromFunctionArguments.h>
+#include <Functions/FunctionFactory.h>
+#include <Functions/FunctionHelpers.h>
+#include <Functions/FunctionsTimeWindow.h>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+ extern const int ILLEGAL_COLUMN;
+ extern const int ILLEGAL_TYPE_OF_ARGUMENT;
+ extern const int ARGUMENT_OUT_OF_BOUND;
+ extern const int SYNTAX_ERROR;
+}
+
+namespace
+{
+ std::tuple<IntervalKind::Kind, Int64>
+ dispatchForIntervalColumns(const ColumnWithTypeAndName & interval_column, const String & function_name)
+ {
+ const auto * interval_type = checkAndGetDataType<DataTypeInterval>(interval_column.type.get());
+ if (!interval_type)
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of argument of function {}",
+ interval_column.name, function_name);
+ const auto * interval_column_const_int64 = checkAndGetColumnConst<ColumnInt64>(interval_column.column.get());
+ if (!interval_column_const_int64)
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of argument of function {}",
+ interval_column.name, function_name);
+ Int64 num_units = interval_column_const_int64->getValue<Int64>();
+ if (num_units <= 0)
+ throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Value for column {} of function {} must be positive",
+ interval_column.name, function_name);
+
+ return {interval_type->getKind(), num_units};
+ }
+
+ ColumnPtr executeWindowBound(const ColumnPtr & column, int index, const String & function_name)
+ {
+ if (const ColumnTuple * col_tuple = checkAndGetColumn<ColumnTuple>(column.get()); col_tuple)
+ {
+ if (!checkColumn<ColumnVector<UInt32>>(*col_tuple->getColumnPtr(index)))
+ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal column for first argument of function {}. "
+ "Must be a Tuple(DataTime, DataTime)", function_name);
+ return col_tuple->getColumnPtr(index);
+ }
+ else
+ {
+ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal column for first argument of function {}. "
+ "Must be Tuple", function_name);
+ }
+ }
+
+ void checkFirstArgument(const ColumnWithTypeAndName & argument, const String & function_name)
+ {
+ if (!isDateTime(argument.type))
+ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}. "
+ "Should be a date with time", argument.type->getName(), function_name);
+ }
+
+ void checkIntervalArgument(const ColumnWithTypeAndName & argument, const String & function_name, IntervalKind & interval_kind, bool & result_type_is_date)
+ {
+ const auto * interval_type = checkAndGetDataType<DataTypeInterval>(argument.type.get());
+ if (!interval_type)
+ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}. "
+ "Should be an interval of time", argument.type->getName(), function_name);
+ interval_kind = interval_type->getKind();
+ result_type_is_date = (interval_type->getKind() == IntervalKind::Year) || (interval_type->getKind() == IntervalKind::Quarter)
+ || (interval_type->getKind() == IntervalKind::Month) || (interval_type->getKind() == IntervalKind::Week);
+ }
+
+ void checkIntervalArgument(const ColumnWithTypeAndName & argument, const String & function_name, bool & result_type_is_date)
+ {
+ IntervalKind interval_kind;
+ checkIntervalArgument(argument, function_name, interval_kind, result_type_is_date);
+ }
+
+ void checkTimeZoneArgument(
+ const ColumnWithTypeAndName & argument,
+ const String & function_name)
+ {
+ if (!WhichDataType(argument.type).isString())
+ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}. "
+ "This argument is optional and must be a constant string with timezone name",
+ argument.type->getName(), function_name);
+ }
+
+ bool checkIntervalOrTimeZoneArgument(const ColumnWithTypeAndName & argument, const String & function_name, IntervalKind & interval_kind, bool & result_type_is_date)
+ {
+ if (WhichDataType(argument.type).isString())
+ {
+ checkTimeZoneArgument(argument, function_name);
+ return false;
+ }
+ checkIntervalArgument(argument, function_name, interval_kind, result_type_is_date);
+ return true;
+ }
+}
+
+template <>
+struct TimeWindowImpl<TUMBLE>
+{
+ static constexpr auto name = "tumble";
+
+ [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ bool result_type_is_date;
+
+ if (arguments.size() == 2)
+ {
+ checkFirstArgument(arguments.at(0), function_name);
+ checkIntervalArgument(arguments.at(1), function_name, result_type_is_date);
+ }
+ else if (arguments.size() == 3)
+ {
+ checkFirstArgument(arguments.at(0), function_name);
+ checkIntervalArgument(arguments.at(1), function_name, result_type_is_date);
+ checkTimeZoneArgument(arguments.at(2), function_name);
+ }
+ else
+ {
+ throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
+ "Number of arguments for function {} doesn't match: passed {}, should be 2 or 3",
+ function_name, arguments.size());
+ }
+
+ DataTypePtr data_type = nullptr;
+ if (result_type_is_date)
+ data_type = std::make_shared<DataTypeDate>();
+ else
+ data_type = std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 2, 0, false));
+
+ return std::make_shared<DataTypeTuple>(DataTypes{data_type, data_type});
+ }
+
+ static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ const auto & time_column = arguments[0];
+ const auto & interval_column = arguments[1];
+ const auto & from_datatype = *time_column.type.get();
+ const auto which_type = WhichDataType(from_datatype);
+ const auto * time_column_vec = checkAndGetColumn<ColumnDateTime>(time_column.column.get());
+ const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(arguments, 2, 0);
+ if (!which_type.isDateTime() || !time_column_vec)
+ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal column {} of function {}. "
+ "Must contain dates or dates with time", time_column.name, function_name);
+
+ auto interval = dispatchForIntervalColumns(interval_column, function_name);
+
+ switch (std::get<0>(interval))
+ {
+ //TODO: add proper support for fractional seconds
+// case IntervalKind::Nanosecond:
+// return executeTumble<UInt32, IntervalKind::Nanosecond>(*time_column_vec, std::get<1>(interval), time_zone);
+// case IntervalKind::Microsecond:
+// return executeTumble<UInt32, IntervalKind::Microsecond>(*time_column_vec, std::get<1>(interval), time_zone);
+// case IntervalKind::Millisecond:
+// return executeTumble<UInt32, IntervalKind::Millisecond>(*time_column_vec, std::get<1>(interval), time_zone);
+ case IntervalKind::Second:
+ return executeTumble<UInt32, IntervalKind::Second>(*time_column_vec, std::get<1>(interval), time_zone);
+ case IntervalKind::Minute:
+ return executeTumble<UInt32, IntervalKind::Minute>(*time_column_vec, std::get<1>(interval), time_zone);
+ case IntervalKind::Hour:
+ return executeTumble<UInt32, IntervalKind::Hour>(*time_column_vec, std::get<1>(interval), time_zone);
+ case IntervalKind::Day:
+ return executeTumble<UInt32, IntervalKind::Day>(*time_column_vec, std::get<1>(interval), time_zone);
+ case IntervalKind::Week:
+ return executeTumble<UInt16, IntervalKind::Week>(*time_column_vec, std::get<1>(interval), time_zone);
+ case IntervalKind::Month:
+ return executeTumble<UInt16, IntervalKind::Month>(*time_column_vec, std::get<1>(interval), time_zone);
+ case IntervalKind::Quarter:
+ return executeTumble<UInt16, IntervalKind::Quarter>(*time_column_vec, std::get<1>(interval), time_zone);
+ case IntervalKind::Year:
+ return executeTumble<UInt16, IntervalKind::Year>(*time_column_vec, std::get<1>(interval), time_zone);
+ default:
+ throw Exception(ErrorCodes::SYNTAX_ERROR, "Fraction seconds are unsupported by windows yet");
+ }
+ UNREACHABLE();
+ }
+
+ template <typename ToType, IntervalKind::Kind unit>
+ static ColumnPtr executeTumble(const ColumnDateTime & time_column, UInt64 num_units, const DateLUTImpl & time_zone)
+ {
+ const auto & time_data = time_column.getData();
+ size_t size = time_column.size();
+ auto start = ColumnVector<ToType>::create();
+ auto end = ColumnVector<ToType>::create();
+ auto & start_data = start->getData();
+ auto & end_data = end->getData();
+ start_data.resize(size);
+ end_data.resize(size);
+ for (size_t i = 0; i != size; ++i)
+ {
+ start_data[i] = ToStartOfTransform<unit>::execute(time_data[i], num_units, time_zone);
+ end_data[i] = AddTime<unit>::execute(start_data[i], num_units, time_zone);
+ }
+ MutableColumns result;
+ result.emplace_back(std::move(start));
+ result.emplace_back(std::move(end));
+ return ColumnTuple::create(std::move(result));
+ }
+};
+
+template <>
+struct TimeWindowImpl<TUMBLE_START>
+{
+ static constexpr auto name = "tumbleStart";
+
+ static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ if (arguments.size() == 1)
+ {
+ auto type = WhichDataType(arguments[0].type);
+ if (type.isTuple())
+ return std::static_pointer_cast<const DataTypeTuple>(arguments[0].type)->getElement(0);
+ else if (type.isUInt32())
+ return std::make_shared<DataTypeDateTime>();
+ else
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN,
+ "Illegal type of first argument of function {} should be DateTime, Tuple or UInt32",
+ function_name);
+ }
+ else
+ {
+ return std::static_pointer_cast<const DataTypeTuple>(TimeWindowImpl<TUMBLE>::getReturnType(arguments, function_name))
+ ->getElement(0);
+ }
+ }
+
+ [[maybe_unused]] static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ const auto & time_column = arguments[0];
+ const auto which_type = WhichDataType(time_column.type);
+ ColumnPtr result_column;
+ if (arguments.size() == 1)
+ {
+ if (which_type.isUInt32())
+ return time_column.column;
+ else //isTuple
+ result_column = time_column.column;
+ }
+ else
+ result_column = TimeWindowImpl<TUMBLE>::dispatchForColumns(arguments, function_name);
+ return executeWindowBound(result_column, 0, function_name);
+ }
+};
+
+template <>
+struct TimeWindowImpl<TUMBLE_END>
+{
+ static constexpr auto name = "tumbleEnd";
+
+ [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ return TimeWindowImpl<TUMBLE_START>::getReturnType(arguments, function_name);
+ }
+
+ [[maybe_unused]] static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String& function_name)
+ {
+ const auto & time_column = arguments[0];
+ const auto which_type = WhichDataType(time_column.type);
+ ColumnPtr result_column;
+ if (arguments.size() == 1)
+ {
+ if (which_type.isUInt32())
+ return time_column.column;
+ else //isTuple
+ result_column = time_column.column;
+ }
+ else
+ result_column = TimeWindowImpl<TUMBLE>::dispatchForColumns(arguments, function_name);
+ return executeWindowBound(result_column, 1, function_name);
+ }
+};
+
+template <>
+struct TimeWindowImpl<HOP>
+{
+ static constexpr auto name = "hop";
+
+ [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ bool result_type_is_date;
+ IntervalKind interval_kind_1;
+ IntervalKind interval_kind_2;
+
+ if (arguments.size() == 3)
+ {
+ checkFirstArgument(arguments.at(0), function_name);
+ checkIntervalArgument(arguments.at(1), function_name, interval_kind_1, result_type_is_date);
+ checkIntervalArgument(arguments.at(2), function_name, interval_kind_2, result_type_is_date);
+ }
+ else if (arguments.size() == 4)
+ {
+ checkFirstArgument(arguments.at(0), function_name);
+ checkIntervalArgument(arguments.at(1), function_name, interval_kind_1, result_type_is_date);
+ checkIntervalArgument(arguments.at(2), function_name, interval_kind_2, result_type_is_date);
+ checkTimeZoneArgument(arguments.at(3), function_name);
+ }
+ else
+ {
+ throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
+ "Number of arguments for function {} doesn't match: passed {}, should be 3 or 4",
+ function_name, arguments.size());
+ }
+
+ if (interval_kind_1 != interval_kind_2)
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal type of window and hop column of function {}, must be same",
+ function_name);
+
+ DataTypePtr data_type = nullptr;
+ if (result_type_is_date)
+ data_type = std::make_shared<DataTypeDate>();
+ else
+ data_type = std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 3, 0, false));
+ return std::make_shared<DataTypeTuple>(DataTypes{data_type, data_type});
+ }
+
+ static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ const auto & time_column = arguments[0];
+ const auto & hop_interval_column = arguments[1];
+ const auto & window_interval_column = arguments[2];
+ const auto & from_datatype = *time_column.type.get();
+ const auto * time_column_vec = checkAndGetColumn<ColumnDateTime>(time_column.column.get());
+ const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(arguments, 3, 0);
+ if (!WhichDataType(from_datatype).isDateTime() || !time_column_vec)
+ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal column {} argument of function {}. "
+ "Must contain dates or dates with time", time_column.name, function_name);
+
+ auto hop_interval = dispatchForIntervalColumns(hop_interval_column, function_name);
+ auto window_interval = dispatchForIntervalColumns(window_interval_column, function_name);
+
+ if (std::get<1>(hop_interval) > std::get<1>(window_interval))
+ throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
+ "Value for hop interval of function {} must not larger than window interval", function_name);
+
+ switch (std::get<0>(window_interval))
+ {
+ //TODO: add proper support for fractional seconds
+// case IntervalKind::Nanosecond:
+// return executeHop<UInt32, IntervalKind::Nanosecond>(
+// *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+// case IntervalKind::Microsecond:
+// return executeHop<UInt32, IntervalKind::Microsecond>(
+// *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+// case IntervalKind::Millisecond:
+// return executeHop<UInt32, IntervalKind::Millisecond>(
+// *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Second:
+ return executeHop<UInt32, IntervalKind::Second>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Minute:
+ return executeHop<UInt32, IntervalKind::Minute>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Hour:
+ return executeHop<UInt32, IntervalKind::Hour>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Day:
+ return executeHop<UInt32, IntervalKind::Day>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Week:
+ return executeHop<UInt16, IntervalKind::Week>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Month:
+ return executeHop<UInt16, IntervalKind::Month>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Quarter:
+ return executeHop<UInt16, IntervalKind::Quarter>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Year:
+ return executeHop<UInt16, IntervalKind::Year>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ default:
+ throw Exception(ErrorCodes::SYNTAX_ERROR, "Fraction seconds are unsupported by windows yet");
+ }
+ UNREACHABLE();
+ }
+
+ template <typename ToType, IntervalKind::Kind kind>
+ static ColumnPtr
+ executeHop(const ColumnDateTime & time_column, UInt64 hop_num_units, UInt64 window_num_units, const DateLUTImpl & time_zone)
+ {
+ const auto & time_data = time_column.getData();
+ size_t size = time_column.size();
+ auto start = ColumnVector<ToType>::create();
+ auto end = ColumnVector<ToType>::create();
+ auto & start_data = start->getData();
+ auto & end_data = end->getData();
+ start_data.resize(size);
+ end_data.resize(size);
+
+ for (size_t i = 0; i < size; ++i)
+ {
+ ToType wstart = ToStartOfTransform<kind>::execute(time_data[i], hop_num_units, time_zone);
+ ToType wend = AddTime<kind>::execute(wstart, hop_num_units, time_zone);
+ wstart = AddTime<kind>::execute(wend, -window_num_units, time_zone);
+ ToType wend_latest;
+
+ do
+ {
+ wend_latest = wend;
+ wend = AddTime<kind>::execute(wend, -hop_num_units, time_zone);
+ } while (wend > time_data[i]);
+
+ end_data[i] = wend_latest;
+ start_data[i] = AddTime<kind>::execute(wend_latest, -window_num_units, time_zone);
+ }
+ MutableColumns result;
+ result.emplace_back(std::move(start));
+ result.emplace_back(std::move(end));
+ return ColumnTuple::create(std::move(result));
+ }
+};
+
+template <>
+struct TimeWindowImpl<WINDOW_ID>
+{
+ static constexpr auto name = "windowID";
+
+ [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ bool result_type_is_date;
+ IntervalKind interval_kind_1;
+ IntervalKind interval_kind_2;
+
+ if (arguments.size() == 2)
+ {
+ checkFirstArgument(arguments.at(0), function_name);
+ checkIntervalArgument(arguments.at(1), function_name, interval_kind_1, result_type_is_date);
+ }
+ else if (arguments.size() == 3)
+ {
+ checkFirstArgument(arguments.at(0), function_name);
+ checkIntervalArgument(arguments.at(1), function_name, interval_kind_1, result_type_is_date);
+ if (checkIntervalOrTimeZoneArgument(arguments.at(2), function_name, interval_kind_2, result_type_is_date))
+ {
+ if (interval_kind_1 != interval_kind_2)
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal type of window and hop column of function {}, must be same",
+ function_name);
+ }
+ }
+ else if (arguments.size() == 4)
+ {
+ checkFirstArgument(arguments.at(0), function_name);
+ checkIntervalArgument(arguments.at(1), function_name, interval_kind_1, result_type_is_date);
+ checkIntervalArgument(arguments.at(2), function_name, interval_kind_2, result_type_is_date);
+ checkTimeZoneArgument(arguments.at(3), function_name);
+ }
+ else
+ {
+ throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
+ "Number of arguments for function {} doesn't match: passed {}, should be 2, 3 or 4",
+ function_name, arguments.size());
+ }
+
+ if (result_type_is_date)
+ return std::make_shared<DataTypeUInt16>();
+ else
+ return std::make_shared<DataTypeUInt32>();
+ }
+
+ [[maybe_unused]] static ColumnPtr
+ dispatchForHopColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ const auto & time_column = arguments[0];
+ const auto & hop_interval_column = arguments[1];
+ const auto & window_interval_column = arguments[2];
+ const auto & from_datatype = *time_column.type.get();
+ const auto * time_column_vec = checkAndGetColumn<ColumnDateTime>(time_column.column.get());
+ const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(arguments, 3, 0);
+ if (!WhichDataType(from_datatype).isDateTime() || !time_column_vec)
+ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal column {} argument of function {}. "
+ "Must contain dates or dates with time", time_column.name, function_name);
+
+ auto hop_interval = dispatchForIntervalColumns(hop_interval_column, function_name);
+ auto window_interval = dispatchForIntervalColumns(window_interval_column, function_name);
+
+ if (std::get<1>(hop_interval) > std::get<1>(window_interval))
+ throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
+ "Value for hop interval of function {} must not larger than window interval", function_name);
+
+ switch (std::get<0>(window_interval))
+ {
+ //TODO: add proper support for fractional seconds
+// case IntervalKind::Nanosecond:
+// return executeHopSlice<UInt32, IntervalKind::Nanosecond>(
+// *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+// case IntervalKind::Microsecond:
+// return executeHopSlice<UInt32, IntervalKind::Microsecond>(
+// *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+// case IntervalKind::Millisecond:
+// return executeHopSlice<UInt32, IntervalKind::Millisecond>(
+// *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Second:
+ return executeHopSlice<UInt32, IntervalKind::Second>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Minute:
+ return executeHopSlice<UInt32, IntervalKind::Minute>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Hour:
+ return executeHopSlice<UInt32, IntervalKind::Hour>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Day:
+ return executeHopSlice<UInt32, IntervalKind::Day>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Week:
+ return executeHopSlice<UInt16, IntervalKind::Week>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Month:
+ return executeHopSlice<UInt16, IntervalKind::Month>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Quarter:
+ return executeHopSlice<UInt16, IntervalKind::Quarter>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ case IntervalKind::Year:
+ return executeHopSlice<UInt16, IntervalKind::Year>(
+ *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone);
+ default:
+ throw Exception(ErrorCodes::SYNTAX_ERROR, "Fraction seconds are unsupported by windows yet");
+ }
+ UNREACHABLE();
+ }
+
+ template <typename ToType, IntervalKind::Kind kind>
+ static ColumnPtr
+ executeHopSlice(const ColumnDateTime & time_column, UInt64 hop_num_units, UInt64 window_num_units, const DateLUTImpl & time_zone)
+ {
+ Int64 gcd_num_units = std::gcd(hop_num_units, window_num_units);
+
+ const auto & time_data = time_column.getData();
+ size_t size = time_column.size();
+
+ auto end = ColumnVector<ToType>::create();
+ auto & end_data = end->getData();
+ end_data.resize(size);
+ for (size_t i = 0; i < size; ++i)
+ {
+ ToType wstart = ToStartOfTransform<kind>::execute(time_data[i], hop_num_units, time_zone);
+ ToType wend = AddTime<kind>::execute(wstart, hop_num_units, time_zone);
+ ToType wend_latest;
+
+ do
+ {
+ wend_latest = wend;
+ wend = AddTime<kind>::execute(wend, -gcd_num_units, time_zone);
+ } while (wend > time_data[i]);
+
+ end_data[i] = wend_latest;
+ }
+ return end;
+ }
+
+ [[maybe_unused]] static ColumnPtr
+ dispatchForTumbleColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ ColumnPtr column = TimeWindowImpl<TUMBLE>::dispatchForColumns(arguments, function_name);
+ return executeWindowBound(column, 1, function_name);
+ }
+
+ [[maybe_unused]] static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ if (arguments.size() == 2)
+ return dispatchForTumbleColumns(arguments, function_name);
+ else
+ {
+ const auto & third_column = arguments[2];
+ if (arguments.size() == 3 && WhichDataType(third_column.type).isString())
+ return dispatchForTumbleColumns(arguments, function_name);
+ else
+ return dispatchForHopColumns(arguments, function_name);
+ }
+ }
+};
+
+template <>
+struct TimeWindowImpl<HOP_START>
+{
+ static constexpr auto name = "hopStart";
+
+ static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ if (arguments.size() == 1)
+ {
+ auto type = WhichDataType(arguments[0].type);
+ if (type.isTuple())
+ return std::static_pointer_cast<const DataTypeTuple>(arguments[0].type)->getElement(0);
+ else if (type.isUInt32())
+ return std::make_shared<DataTypeDateTime>();
+ else
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN,
+ "Illegal type of first argument of function {} should be DateTime, Tuple or UInt32",
+ function_name);
+ }
+ else
+ {
+ return std::static_pointer_cast<const DataTypeTuple>(TimeWindowImpl<HOP>::getReturnType(arguments, function_name))->getElement(0);
+ }
+ }
+
+ [[maybe_unused]] static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ const auto & time_column = arguments[0];
+ const auto which_type = WhichDataType(time_column.type);
+ ColumnPtr result_column;
+ if (arguments.size() == 1)
+ {
+ if (which_type.isUInt32())
+ return time_column.column;
+ else //isTuple
+ result_column = time_column.column;
+ }
+ else
+ result_column = TimeWindowImpl<HOP>::dispatchForColumns(arguments, function_name);
+ return executeWindowBound(result_column, 0, function_name);
+ }
+};
+
+template <>
+struct TimeWindowImpl<HOP_END>
+{
+ static constexpr auto name = "hopEnd";
+
+ [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ return TimeWindowImpl<HOP_START>::getReturnType(arguments, function_name);
+ }
+
+ [[maybe_unused]] static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
+ {
+ const auto & time_column = arguments[0];
+ const auto which_type = WhichDataType(time_column.type);
+ ColumnPtr result_column;
+ if (arguments.size() == 1)
+ {
+ if (which_type.isUInt32())
+ return time_column.column;
+ else //isTuple
+ result_column = time_column.column;
+ }
+ else
+ result_column = TimeWindowImpl<HOP>::dispatchForColumns(arguments, function_name);
+
+ return executeWindowBound(result_column, 1, function_name);
+ }
+};
+
+template <TimeWindowFunctionName type>
+DataTypePtr FunctionTimeWindow<type>::getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const
+{
+ return TimeWindowImpl<type>::getReturnType(arguments, name);
+}
+
+template <TimeWindowFunctionName type>
+ColumnPtr FunctionTimeWindow<type>::executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & /*result_type*/, size_t /*input_rows_count*/) const
+{
+ return TimeWindowImpl<type>::dispatchForColumns(arguments, name);
+}
+
+REGISTER_FUNCTION(TimeWindow)
+{
+ factory.registerFunction<FunctionTumble>();
+ factory.registerFunction<FunctionHop>();
+ factory.registerFunction<FunctionTumbleStart>();
+ factory.registerFunction<FunctionTumbleEnd>();
+ factory.registerFunction<FunctionHopStart>();
+ factory.registerFunction<FunctionHopEnd>();
+ factory.registerFunction<FunctionWindowId>();
+}
+}