aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2022-12-10 09:57:44 +0300
committerhcpp <hcpp@ydb.tech>2022-12-10 09:57:44 +0300
commita8aefff09fd4d9e76262100a23301e19259f8a3c (patch)
tree8ab22dc7b2f107b7ac5a79f80e8d9cea3252fb5c
parent1bb7312173c0162299689aaecb798070864bd9b8 (diff)
downloadydb-a8aefff09fd4d9e76262100a23301e19259f8a3c.tar.gz
format setting has been supported for timestamp and datetime
-rw-r--r--ydb/core/yq/libs/control_plane_storage/request_validators.cpp57
-rw-r--r--ydb/core/yq/libs/control_plane_storage/request_validators.h4
-rw-r--r--ydb/core/yq/libs/control_plane_storage/util.cpp21
-rw-r--r--ydb/core/yq/libs/control_plane_storage/util.h4
-rw-r--r--ydb/library/yql/providers/common/mkql/parser.cpp26
-rw-r--r--ydb/library/yql/providers/common/mkql/parser.h1
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp35
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.h4
-rw-r--r--ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json3
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp59
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp6
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp16
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h20
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp49
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp56
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp53
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp54
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp45
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h3
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp32
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp54
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h20
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h147
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h8
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp26
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h27
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h10
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp66
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h12
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp42
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h8
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp18
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h6
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp4
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp2
40 files changed, 897 insertions, 111 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
index 75b3c3e7942..a9ccae0b9a2 100644
--- a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
@@ -105,8 +105,60 @@ NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& se
return issues;
}
+NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) {
+ NYql::TIssues issues;
+ TSet<TString> conflictingKeys;
+ for (const auto& [key, value]: formatSetting) {
+ if (key == "data.datetime.format_name"sv) {
+ if (!IsValidDateTimeFormatName(value)) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.datetime.format_name " + value));
+ }
+ if (conflictingKeys.contains("data.datetime.format")) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together"));
+ }
+ conflictingKeys.insert("data.datetime.format_name");
+ continue;
+ }
+
+ if (key == "data.datetime.format"sv) {
+ if (conflictingKeys.contains("data.datetime.format_name")) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together"));
+ }
+ conflictingKeys.insert("data.datetime.format");
+ continue;
+ }
+
+ if (key == "data.timestamp.format_name"sv) {
+ if (!IsValidTimestampFormatName(value)) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.timestamp.format_name " + value));
+ }
+ if (conflictingKeys.contains("data.timestamp.format")) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together"));
+ }
+ conflictingKeys.insert("data.timestamp.format_name");
+ continue;
+ }
+
+ if (key == "data.timestamp.format"sv) {
+ if (conflictingKeys.contains("data.timestamp.format_name")) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together"));
+ }
+ conflictingKeys.insert("data.timestamp.format");
+ continue;
+ }
+
+ if (matchAllSettings) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key));
+ }
+ }
+ return issues;
+}
+
+
NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) {
NYql::TIssues issues;
+ TSet<TString> conflictingKeys;
+ issues.AddIssues(ValidateDateFormatSetting(formatSetting));
for (const auto& [key, value]: formatSetting) {
if (key == "file_pattern"sv) {
continue;
@@ -118,6 +170,11 @@ NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobu
}
continue;
}
+
+ if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv}, key)) {
+ continue;
+ }
+
if (key == "csv_delimiter"sv) {
if (format != "csv_with_names"sv) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should be used only with format csv_with_names"));
diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.h b/ydb/core/yq/libs/control_plane_storage/request_validators.h
index 849e61cb5c5..98438c38d17 100644
--- a/ydb/core/yq/libs/control_plane_storage/request_validators.h
+++ b/ydb/core/yq/libs/control_plane_storage/request_validators.h
@@ -78,6 +78,8 @@ NYql::TIssues ValidateQuery(const T& ev, size_t maxSize)
NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting);
+
+NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);
NYql::TIssues ValidateProjectionColumns(const YandexQuery::Schema& schema, const TVector<TString>& partitionedBy);
template<typename T>
@@ -111,7 +113,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<YandexQuer
if (!dataStreams.has_schema()) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden"));
}
- issues.AddIssues(ValidateFormatSetting(dataStreams.format(), dataStreams.format_setting()));
+ issues.AddIssues(ValidateDateFormatSetting(dataStreams.format_setting(), true));
break;
}
case YandexQuery::BindingSetting::BINDING_NOT_SET: {
diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp
index 169194601dc..f32f8b8be54 100644
--- a/ydb/core/yq/libs/control_plane_storage/util.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/util.cpp
@@ -189,7 +189,7 @@ std::pair<TString, TString> SplitId(const TString& id, char delim) {
}
bool IsValidIntervalUnit(const TString& unit) {
- static constexpr std::array<std::string_view, 10> IntervalUnits = {
+ static constexpr std::array<std::string_view, 7> IntervalUnits = {
"MICROSECONDS"sv,
"MILLISECONDS"sv,
"SECONDS"sv,
@@ -201,4 +201,23 @@ bool IsValidIntervalUnit(const TString& unit) {
return IsIn(IntervalUnits, unit);
}
+bool IsValidDateTimeFormatName(const TString& formatName) {
+ static constexpr std::array<std::string_view, 2> FormatNames = {
+ "POSIX"sv,
+ "ISO"sv
+ };
+ return IsIn(FormatNames, formatName);
+}
+
+bool IsValidTimestampFormatName(const TString& formatName) {
+ static constexpr std::array<std::string_view, 5> FormatNames = {
+ "POSIX"sv,
+ "ISO"sv,
+ "UNIX_TIME_MILLISECONDS"sv,
+ "UNIX_TIME_SECONDS"sv,
+ "UNIX_TIME_MICROSECONDS"sv
+ };
+ return IsIn(FormatNames, formatName);
+}
+
} //namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/util.h b/ydb/core/yq/libs/control_plane_storage/util.h
index ca100195757..af8dbd27978 100644
--- a/ydb/core/yq/libs/control_plane_storage/util.h
+++ b/ydb/core/yq/libs/control_plane_storage/util.h
@@ -48,4 +48,8 @@ std::pair<TString, TString> SplitId(const TString& id, char delim = '-');
bool IsValidIntervalUnit(const TString& unit);
+bool IsValidDateTimeFormatName(const TString& formatName);
+
+bool IsValidTimestampFormatName(const TString& formatName);
+
} // namespace NYq
diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp
index 5fbbc9df26b..cbe39bbc3e5 100644
--- a/ydb/library/yql/providers/common/mkql/parser.cpp
+++ b/ydb/library/yql/providers/common/mkql/parser.cpp
@@ -143,6 +143,7 @@ TRuntimeNode BuildParseCall(
std::unordered_map<TString, ui32>&& metadataColumns,
const std::string_view& format,
const std::string_view& compression,
+ const std::vector<std::pair<std::string_view, std::string_view>>& formatSettings,
TType* inputType,
TType* parseItemType,
TType* finalItemType,
@@ -274,13 +275,28 @@ TRuntimeNode BuildParseCall(
inputDataType = inputItemTuple->GetElementType(0);
}
+ TString settingsAsJson;
+ TStringOutput stream(settingsAsJson);
+ NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig());
+ writer.OpenMap();
+
+ for (const auto& v : formatSettings) {
+ writer.Write(v.first, v.second);
+ }
+
+ writer.CloseMap();
+ writer.Flush();
+ if (settingsAsJson == "{}") {
+ settingsAsJson.clear();
+ }
+
const auto userType = ctx.ProgramBuilder.NewTupleType({
ctx.ProgramBuilder.NewTupleType({inputType}),
ctx.ProgramBuilder.NewStructType({}),
userOutputType});
input = TType::EKind::Resource == inputDataType->GetKind() ?
ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseBlocks", {}, userType), {input})):
- ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseFormat", {}, userType, format), {input}));
+ ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseFormat", {}, userType, format + settingsAsJson), {input}));
}
return ctx.ProgramBuilder.ExpandMap(input,
@@ -348,6 +364,13 @@ TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWrapBase& wrapper, NCommon
}
}
+ std::vector<std::pair<std::string_view, std::string_view>> formatSettings;
+ if (auto settings = GetSetting(wrapper.Settings().Cast().Ref(), "formatSettings")) {
+ settings->Tail().ForEachChild([&](const TExprNode& v) {
+ formatSettings.emplace_back(v.Child(0)->Content(), v.Child(1)->Content());
+ });
+ }
+
auto parsedItems = rowType->GetItems();
EraseIf(parsedItems, [extraType, &metadataColumns](const auto& item) {
return extraType && extraType->FindItem(item->GetName()) || metadataColumns.contains(TString(item->GetName()));
@@ -372,6 +395,7 @@ TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWrapBase& wrapper, NCommon
std::move(metadataColumns),
format.Content() + settings.front(),
settings.back(),
+ formatSettings,
inputType,
parseItemType,
finalItemType,
diff --git a/ydb/library/yql/providers/common/mkql/parser.h b/ydb/library/yql/providers/common/mkql/parser.h
index 30d023b2a0a..cf35d6447bd 100644
--- a/ydb/library/yql/providers/common/mkql/parser.h
+++ b/ydb/library/yql/providers/common/mkql/parser.h
@@ -20,6 +20,7 @@ NKikimr::NMiniKQL::TRuntimeNode BuildParseCall(
std::unordered_map<TString, ui32>&& metadataColumns,
const std::string_view& format,
const std::string_view& compression,
+ const std::vector<std::pair<std::string_view, std::string_view>>& formatSettings,
NKikimr::NMiniKQL::TType* inputType,
NKikimr::NMiniKQL::TType* parseItemType,
NKikimr::NMiniKQL::TType* finalItemType,
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 16e5129cf0c..ffc54481edc 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -49,7 +49,7 @@ namespace {
"bzip2"sv,
"xz"sv
};
- constexpr std::array<std::string_view, 10> IntervalUnits = {
+ constexpr std::array<std::string_view, 7> IntervalUnits = {
"MICROSECONDS"sv,
"MILLISECONDS"sv,
"SECONDS"sv,
@@ -58,6 +58,18 @@ namespace {
"DAYS"sv,
"WEEKS"sv
};
+ constexpr std::array<std::string_view, 2> DateTimeFormatNames = {
+ "POSIX"sv,
+ "ISO"sv
+ };
+
+ constexpr std::array<std::string_view, 5> TimestampFormatNames = {
+ "POSIX"sv,
+ "ISO"sv,
+ "UNIX_TIME_MILLISECONDS"sv,
+ "UNIX_TIME_SECONDS"sv,
+ "UNIX_TIME_MICROSECONDS"sv
+ };
} // namespace
bool TCommitSettings::EnsureModeEmpty(TExprContext& ctx) {
@@ -1163,14 +1175,27 @@ bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) {
return false;
}
-bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx) {
- if (unit.empty() || IsIn(IntervalUnits, unit)) {
+template<typename T>
+bool ValidateValueInDictionary(std::string_view value, TExprContext& ctx, const T& dictionary) {
+ if (value.empty() || IsIn(dictionary, value)) {
return true;
}
- ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << unit
- << ". Use one of: " << JoinSeq(", ", IntervalUnits)));
+ ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << value
+ << ". Use one of: " << JoinSeq(", ", dictionary)));
return false;
}
+bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx) {
+ return ValidateValueInDictionary(unit, ctx, IntervalUnits);
+}
+
+bool ValidateDateTimeFormatName(std::string_view formatName, TExprContext& ctx) {
+ return ValidateValueInDictionary(formatName, ctx, DateTimeFormatNames);
+}
+
+bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx) {
+ return ValidateValueInDictionary(formatName, ctx, TimestampFormatNames);
+}
+
} // namespace NCommon
} // namespace NYql
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h
index cf2f257aac7..87242eab37f 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.h
+++ b/ydb/library/yql/providers/common/provider/yql_provider.h
@@ -141,7 +141,9 @@ bool ValidateCompressionForOutput(std::string_view compression, TExprContext& ct
bool ValidateFormatForInput(std::string_view format, TExprContext& ctx);
bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx);
-bool ValidateIntervalUnit(std::string_view format, TExprContext& ctx);
+bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx);
+bool ValidateDateTimeFormatName(std::string_view formatName, TExprContext& ctx);
+bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx);
} // namespace NCommon
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json b/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json
index bad47e9d70e..e43a69ba175 100644
--- a/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json
+++ b/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json
@@ -58,7 +58,8 @@
{"Index": 3, "Name": "Columns", "Type": "TExprBase"},
{"Index": 4, "Name": "Format", "Type": "TCoAtom"},
{"Index": 5, "Name": "Compression", "Type": "TCoAtom"},
- {"Index": 6, "Name": "LimitHint", "Type": "TExprBase", "Optional": true}
+ {"Index": 6, "Name": "LimitHint", "Type": "TExprBase", "Optional": true},
+ {"Index": 7, "Name": "Settings", "Type": "TExprList", "Optional": true}
]
},
{
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp
index 582456cc1a5..bb15fd399ba 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp
@@ -125,12 +125,69 @@ public:
format = "raw";
}
+ auto settings = Build<TExprList>(ctx, read.Pos());
+ bool hasDateTimeFormat = false;
+ bool hasDateTimeFormatName = false;
+ bool hasTimestampFormat = false;
+ bool hasTimestampFormatName = false;
+ if (topicKeyParser.GetDateTimeFormatName()) {
+ settings.Add(topicKeyParser.GetDateTimeFormatName());
+ hasDateTimeFormatName = true;
+ if (!NCommon::ValidateDateTimeFormatName(topicKeyParser.GetDateTimeFormatName()->Child(1)->Content(), ctx)) {
+ return nullptr;
+ }
+ }
+
+ if (topicKeyParser.GetDateTimeFormat()) {
+ settings.Add(topicKeyParser.GetDateTimeFormat());
+ hasDateTimeFormat = true;
+ }
+
+ if (topicKeyParser.GetTimestampFormatName()) {
+ settings.Add(topicKeyParser.GetTimestampFormatName());
+ hasDateTimeFormatName = true;
+ if (!NCommon::ValidateTimestampFormatName(topicKeyParser.GetTimestampFormatName()->Child(1)->Content(), ctx)) {
+ return nullptr;
+ }
+ }
+
+ if (topicKeyParser.GetTimestampFormat()) {
+ settings.Add(topicKeyParser.GetTimestampFormat());
+ hasTimestampFormat = true;
+ }
+
+ if (hasDateTimeFormat && hasDateTimeFormatName) {
+ ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.datetime.format_name and data.datetime.format together"));
+ return nullptr;
+ }
+
+ if (hasTimestampFormat && hasTimestampFormatName) {
+ ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.timestamp.format_name and data.timestamp.format together"));
+ return nullptr;
+ }
+
+ if (!hasDateTimeFormat && !hasDateTimeFormatName) {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(read.Pos(), "data.datetime.formatname"));
+ pair.push_back(ctx.NewAtom(read.Pos(), "POSIX"));
+ settings.Add(ctx.NewList(read.Pos(), std::move(pair)));
+ }
+
+ if (!hasTimestampFormat && !hasTimestampFormatName) {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(read.Pos(), "data.timestamp.formatname"));
+ pair.push_back(ctx.NewAtom(read.Pos(), "POSIX"));
+ settings.Add(ctx.NewList(read.Pos(), std::move(pair)));
+ }
+
auto builder = Build<TPqReadTopic>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
.Topic(std::move(topicNode))
.Format().Value(format).Build()
- .Compression().Value(topicKeyParser.GetCompression()).Build();
+ .Compression().Value(topicKeyParser.GetCompression()).Build()
+ .LimitHint<TCoVoid>().Build()
+ .Settings(settings.Done());
if (topicKeyParser.GetColumnOrder()) {
builder.Columns(topicKeyParser.GetColumnOrder());
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
index 639dcf3024e..2680ddb56b4 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
@@ -85,7 +85,7 @@ public:
}
TStatus HandleReadTopic(TExprBase input, TExprContext& ctx) {
- if (!EnsureMinMaxArgsCount(input.Ref(), 6, 7, ctx)) {
+ if (!EnsureMinMaxArgsCount(input.Ref(), 6, 8, ctx)) {
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
index fbd03b35c7a..a9cfab9cee8 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
@@ -111,6 +111,12 @@ public:
.Value(ctx.NewList(pqReadTopic.Pos(), std::move(metadataFieldsList)))
.Done());
+
+ settings.push_back(Build<TCoNameValueTuple>(ctx, pqReadTopic.Pos())
+ .Name().Build("formatSettings")
+ .Value(std::move(pqReadTopic.Settings()))
+ .Done());
+
const auto token = "cluster:default_" + clusterName;
auto columns = pqReadTopic.Columns().Ptr();
if (!columns->IsList()) {
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp
index 0864b7b6170..2b3d9a8190a 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp
@@ -51,6 +51,22 @@ bool TTopicKeyParser::Parse(const TExprNode& expr, TExprNode::TPtr readSettings,
Compression = readSettings->Child(i)->Child(1)->Content();
continue;
}
+ if (readSettings->Child(i)->Head().IsAtom("data.datetime.formatname")) {
+ DateTimeFormatName = readSettings->Child(i);
+ continue;
+ }
+ if (readSettings->Child(i)->Head().IsAtom("data.datetime.format")) {
+ DateTimeFormat = readSettings->Child(i);
+ continue;
+ }
+ if (readSettings->Child(i)->Head().IsAtom("data.timestamp.formatname")) {
+ TimestampFormatName = readSettings->Child(i);
+ continue;
+ }
+ if (readSettings->Child(i)->Head().IsAtom("data.timestamp.format")) {
+ TimestampFormat = readSettings->Child(i);
+ continue;
+ }
}
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h
index b22f61125fd..2fabecb2765 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h
@@ -30,6 +30,22 @@ public:
return Compression;
}
+ TExprNode::TPtr GetDateTimeFormatName() const {
+ return DateTimeFormatName;
+ }
+
+ TExprNode::TPtr GetDateTimeFormat() {
+ return DateTimeFormat;
+ }
+
+ TExprNode::TPtr GetTimestampFormatName() {
+ return TimestampFormatName;
+ }
+
+ TExprNode::TPtr GetTimestampFormat() {
+ return TimestampFormat;
+ }
+
bool Parse(const TExprNode& expr, TExprNode::TPtr readSettings, TExprContext& ctx);
private:
@@ -40,6 +56,10 @@ private:
TString TopicPath;
TString Format;
TString Compression;
+ TExprNode::TPtr DateTimeFormatName;
+ TExprNode::TPtr DateTimeFormat;
+ TExprNode::TPtr TimestampFormatName;
+ TExprNode::TPtr TimestampFormat;
TExprNode::TPtr UserSchema;
TExprNode::TPtr ColumnOrder;
};
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 8127b8af3c3..c0db287dd0c 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1231,6 +1231,31 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type, NSerialization::TSerializat
return nullptr;
}
+NDB::FormatSettings::DateTimeFormat ToDateTimeFormat(const TString& formatName) {
+ static TMap<TString, NDB::FormatSettings::DateTimeFormat> formats{
+ {"POSIX", NDB::FormatSettings::DateTimeFormat::POSIX},
+ {"ISO", NDB::FormatSettings::DateTimeFormat::ISO}
+ };
+ if (auto it = formats.find(formatName); it != formats.end()) {
+ return it->second;
+ }
+ return NDB::FormatSettings::DateTimeFormat::Unspecified;
+}
+
+NDB::FormatSettings::TimestampFormat ToTimestampFormat(const TString& formatName) {
+ static TMap<TString, NDB::FormatSettings::TimestampFormat> formats{
+ {"POSIX", NDB::FormatSettings::TimestampFormat::POSIX},
+ {"ISO", NDB::FormatSettings::TimestampFormat::ISO},
+ {"UNIX_TIME_MILLISECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeMilliseconds},
+ {"UNIX_TIME_SECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeSeconds},
+ {"UNIX_TIME_MICROSECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeMicroSeconds}
+ };
+ if (auto it = formats.find(formatName); it != formats.end()) {
+ return it->second;
+ }
+ return NDB::FormatSettings::TimestampFormat::Unspecified;
+}
+
} // namespace
using namespace NKikimr::NMiniKQL;
@@ -1317,6 +1342,30 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (const auto it = settings.find("csvdelimiter"); settings.cend() != it && !it->second.empty())
readSpec->Settings.csv.delimiter = it->second[0];
+ if (const auto it = settings.find("data.datetime.formatname"); settings.cend() != it) {
+ readSpec->Settings.date_time_format_name = ToDateTimeFormat(it->second);
+ }
+
+ if (const auto it = settings.find("data.datetime.format"); settings.cend() != it) {
+ readSpec->Settings.date_time_format = it->second;
+ }
+
+ if (const auto it = settings.find("data.timestamp.formatname"); settings.cend() != it) {
+ readSpec->Settings.timestamp_format_name = ToTimestampFormat(it->second);
+ }
+
+ if (const auto it = settings.find("data.timestamp.format"); settings.cend() != it) {
+ readSpec->Settings.timestamp_format = it->second;
+ }
+
+ if (readSpec->Settings.date_time_format_name == NDB::FormatSettings::DateTimeFormat::Unspecified && readSpec->Settings.date_time_format.empty()) {
+ readSpec->Settings.date_time_format_name = NDB::FormatSettings::DateTimeFormat::POSIX;
+ }
+
+ if (readSpec->Settings.timestamp_format_name == NDB::FormatSettings::TimestampFormat::Unspecified && readSpec->Settings.timestamp_format.empty()) {
+ readSpec->Settings.timestamp_format_name = NDB::FormatSettings::TimestampFormat::POSIX;
+ }
+
#define SUPPORTED_FLAGS(xx) \
xx(skip_unknown_fields, true) \
xx(import_nested_json, true) \
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 6626328e136..cf888e5e8d0 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -115,6 +115,10 @@ private:
if (!EnsureTuple(*input->Child(TS3Target::idx_Settings), ctx))
return TStatus::Error;
+ bool hasDateTimeFormat = false;
+ bool hasDateTimeFormatName = false;
+ bool hasTimestampFormat = false;
+ bool hasTimestampFormatName = false;
const auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) {
if (name == "compression") {
const auto& value = setting.Tail();
@@ -151,6 +155,46 @@ private:
return EnsureValidUserSchemaSetting(setting, ctx);
}
+ if (name == "data.datetime.formatname") {
+ hasDateTimeFormatName = true;
+ const auto& value = setting.Tail();
+ if (!EnsureAtom(value, ctx)) {
+ return false;
+ }
+
+ return NCommon::ValidateDateTimeFormatName(value.Content(), ctx);
+ }
+
+ if (name == "data.timestamp.formatname") {
+ hasTimestampFormatName = true;
+ const auto& value = setting.Tail();
+ if (!EnsureAtom(value, ctx)) {
+ return false;
+ }
+
+ return NCommon::ValidateTimestampFormatName(value.Content(), ctx);
+ }
+
+ if (name == "data.datetime.format") {
+ hasDateTimeFormat = true;
+ const auto& value = setting.Tail();
+ if (!EnsureAtom(value, ctx)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ if (name == "data.timestamp.format") {
+ hasTimestampFormat = true;
+ const auto& value = setting.Tail();
+ if (!EnsureAtom(value, ctx)) {
+ return false;
+ }
+
+ return true;
+ }
+
if (name == "csvdelimiter") {
const auto& value = setting.Tail();
if (!EnsureAtom(value, ctx)) {
@@ -172,7 +216,17 @@ private:
return true;
};
- if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "csvdelimiter", "filepattern"}, validator, ctx)) {
+ if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "data.datetime.formatname", "data.datetime.format", "data.timestamp.formatname", "data.timestamp.format", "csvdelimiter", "filepattern"}, validator, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (hasDateTimeFormat && hasDateTimeFormatName) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Don't use data.datetime.format_name and data.datetime.format together"));
+ return TStatus::Error;
+ }
+
+ if (hasTimestampFormat && hasTimestampFormatName) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Don't use data.timestamp.format_name and data.timestamp.format together"));
return TStatus::Error;
}
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 25c651a1e24..07703efa8fc 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -293,6 +293,10 @@ public:
if (input->ChildrenSize() > TS3Object::idx_Settings) {
bool haveProjection = false;
bool havePartitionedBy = false;
+ bool hasDateTimeFormat = false;
+ bool hasDateTimeFormatName = false;
+ bool hasTimestampFormat = false;
+ bool hasTimestampFormatName = false;
auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) {
if (name != "partitionedby"sv && name != "directories"sv && setting.ChildrenSize() != 2) {
ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()),
@@ -339,6 +343,42 @@ public:
return NCommon::ValidateIntervalUnit(unit, ctx);
}
+ if (name == "data.datetime.formatname"sv) {
+ hasDateTimeFormatName = true;
+ TStringBuf formatName;
+ if (!ExtractSettingValue(setting.Tail(), "data.datetime.format_name"sv, format, {}, ctx, formatName)) {
+ return false;
+ }
+ return NCommon::ValidateDateTimeFormatName(formatName, ctx);
+ }
+
+ if (name == "data.datetime.format"sv) {
+ hasDateTimeFormat = true;
+ TStringBuf unused;
+ if (!ExtractSettingValue(setting.Tail(), "data.datetime.format"sv, format, {}, ctx, unused)) {
+ return false;
+ }
+ return true;
+ }
+
+ if (name == "data.timestamp.formatname"sv) {
+ hasTimestampFormatName = true;
+ TStringBuf formatName;
+ if (!ExtractSettingValue(setting.Tail(), "data.timestamp.format_name"sv, format, {}, ctx, formatName)) {
+ return false;
+ }
+ return NCommon::ValidateTimestampFormatName(formatName, ctx);
+ }
+
+ if (name == "data.timestamp.format"sv) {
+ hasTimestampFormat = true;
+ TStringBuf unused;
+ if (!ExtractSettingValue(setting.Tail(), "data.timestamp.format"sv, format, {}, ctx, unused)) {
+ return false;
+ }
+ return true;
+ }
+
if (name == "readmaxbytes"sv) {
TStringBuf unused;
if (!ExtractSettingValue(setting.Tail(), "read_max_bytes"sv, format, "raw"sv, ctx, unused)) {
@@ -392,7 +432,8 @@ public:
};
if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings),
{ "compression"sv, "partitionedby"sv, "projection"sv, "data.interval.unit"sv,
- "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv }, validator, ctx))
+ "data.datetime.formatname"sv, "data.datetime.format"sv, "data.timestamp.formatname"sv, "data.timestamp.format"sv,
+ "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv }, validator, ctx))
{
return TStatus::Error;
}
@@ -400,6 +441,16 @@ public:
ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Missing partitioned_by setting for projection"));
return TStatus::Error;
}
+
+ if (hasDateTimeFormat && hasDateTimeFormatName) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Don't use data.datetime.format_name and data.datetime.format together"));
+ return TStatus::Error;
+ }
+
+ if (hasTimestampFormat && hasTimestampFormatName) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Don't use data.timestamp.format_name and data.timestamp.format together"));
+ return TStatus::Error;
+ }
}
input->SetTypeAnn(ctx.MakeType<TUnitExprType>());
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index 59b8e3e3038..290bb60cbd8 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -37,6 +37,22 @@ TExprNode::TPtr GetCsvDelimiter(const TExprNode& settings) {
return FindChild(settings, "csvdelimiter"sv);
}
+TExprNode::TPtr GetDateTimeFormatName(const TExprNode& settings) {
+ return FindChild(settings, "data.datetime.formatname"sv);
+}
+
+TExprNode::TPtr GetDateTimeFormat(const TExprNode& settings) {
+ return FindChild(settings, "data.datetime.format"sv);
+}
+
+TExprNode::TPtr GetTimestampFormatName(const TExprNode& settings) {
+ return FindChild(settings, "data.timestamp.formatname"sv);
+}
+
+TExprNode::TPtr GetTimestampFormat(const TExprNode& settings) {
+ return FindChild(settings, "data.timestamp.format"sv);
+}
+
TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
if (partBy) {
auto children = partBy->ChildrenList();
@@ -111,6 +127,44 @@ public:
sinkOutputSettingsBuilder.Add(std::move(csvDelimiter));
}
+ bool hasDateTimeFormat = false;
+ bool hasDateTimeFormatName = false;
+ bool hasTimestampFormat = false;
+ bool hasTimestampFormatName = false;
+ if (auto dateTimeFormatName = GetDateTimeFormatName(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(dateTimeFormatName));
+ hasDateTimeFormatName = true;
+ }
+
+ if (auto dateTimeFormat = GetDateTimeFormat(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(dateTimeFormat));
+ hasDateTimeFormat = true;
+ }
+
+ if (auto timestampFormatName = GetTimestampFormatName(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(timestampFormatName));
+ hasTimestampFormatName = true;
+ }
+
+ if (auto timestampFormat = GetTimestampFormat(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(timestampFormat));
+ hasTimestampFormat = true;
+ }
+
+ if (!hasDateTimeFormat && !hasDateTimeFormatName) {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(targetNode.Pos(), "data.datetime.formatname"));
+ pair.push_back(ctx.NewAtom(targetNode.Pos(), "POSIX"));
+ sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
+ }
+
+ if (!hasTimestampFormat && !hasTimestampFormatName) {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(targetNode.Pos(), "data.timestamp.formatname"));
+ pair.push_back(ctx.NewAtom(targetNode.Pos(), "POSIX"));
+ sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
+ }
+
if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink.";
return keys.empty() ?
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp b/ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp
index 28480e05df9..110f83b24f1 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp
@@ -477,7 +477,7 @@ JSON::Pos JSON::searchField(const char * data, size_t size) const
{
if (static_cast<int>(size) + 2 > it->dataEnd() - it->data())
continue;
- if (!strncmp(data, it->data() + 1, size))
+ if (!strncmp(data, it->data() + 1, size) && *(it->data() + size + 1) == '"')
break;
}
else
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
index 5d8cb4a0849..556648d6915 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
@@ -798,6 +798,31 @@ private:
size_t CurrentRow = 0U;
};
+static NDB::FormatSettings::DateTimeFormat ToDateTimeFormat(const std::string& formatName) {
+ static std::map<std::string, NDB::FormatSettings::DateTimeFormat> formats{
+ {"POSIX", NDB::FormatSettings::DateTimeFormat::POSIX},
+ {"ISO", NDB::FormatSettings::DateTimeFormat::ISO}
+ };
+ if (auto it = formats.find(formatName); it != formats.end()) {
+ return it->second;
+ }
+ return NDB::FormatSettings::DateTimeFormat::Unspecified;
+}
+
+static NDB::FormatSettings::TimestampFormat ToTimestampFormat(const std::string& formatName) {
+ static std::map<std::string, NDB::FormatSettings::TimestampFormat> formats{
+ {"POSIX", NDB::FormatSettings::TimestampFormat::POSIX},
+ {"ISO", NDB::FormatSettings::TimestampFormat::ISO},
+ {"UNIX_TIME_MILLISECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeMilliseconds},
+ {"UNIX_TIME_SECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeSeconds},
+ {"UNIX_TIME_MICROSECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeMicroSeconds}
+ };
+ if (auto it = formats.find(formatName); it != formats.end()) {
+ return it->second;
+ }
+ return NDB::FormatSettings::TimestampFormat::Unspecified;
+}
+
NDB::FormatSettings GetFormatSettings(const std::string_view& view) {
NDB::FormatSettings settings;
settings.skip_unknown_fields = true;
@@ -826,6 +851,26 @@ NDB::FormatSettings GetFormatSettings(const std::string_view& view) {
}
settings.csv.delimiter = delimiter[0];
}
+
+ if (json.has("data.datetime.formatname")) {
+ auto formatName = json["data.datetime.formatname"].getString();
+ settings.date_time_format_name = ToDateTimeFormat(formatName);
+ }
+
+ if (json.has("data.datetime.format")) {
+ auto format = json["data.datetime.format"].getString();
+ settings.date_time_format = format;
+ }
+
+ if (json.has("data.timestamp.formatname")) {
+ auto formatName = json["data.timestamp.formatname"].getString();
+ settings.timestamp_format_name = ToTimestampFormat(formatName);
+ }
+
+ if (json.has("data.timestamp.format")) {
+ auto format = json["data.timestamp.format"].getString();
+ settings.timestamp_format = format;
+ }
}
return settings;
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp
index fef732c0e11..c8f9c167f68 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp
@@ -237,7 +237,7 @@ ColumnWithTypeAndName ColumnFunction::reduce() const
if (is_function_compiled)
ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute);
- res.column = function->execute(columns, res.type, size_);
+ res.column = function->execute(columns, res.type, size_, {});
return res;
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h
index b5d76f216ae..f8595d8217d 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h
@@ -788,7 +788,8 @@ template <> struct Field::EnumToType<Field::Types::AggregateFunctionState> { usi
inline constexpr bool isInt64OrUInt64FieldType(Field::Types::Which t)
{
return t == Field::Types::Int64
- || t == Field::Types::UInt64;
+ || t == Field::Types::UInt64
+ || t == Field::Types::Decimal64;
}
// Field value getter with type checking in debug builds.
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp
index ff30075faf5..18320c6ac54 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp
@@ -19,6 +19,22 @@ namespace
inline void readText(time_t & x, ReadBuffer & istr, const FormatSettings & settings, const DateLUTImpl & time_zone, const DateLUTImpl & utc_time_zone)
{
+ if (!settings.date_time_format.empty()) {
+ readDateTimeTextFormat(x, istr, settings.date_time_format);
+ return;
+ }
+
+ switch (settings.date_time_format_name) {
+ case FormatSettings::DateTimeFormat::Unspecified:
+ break;
+ case FormatSettings::DateTimeFormat::ISO:
+ readDateTimeTextISO(x, istr);
+ return;
+ case FormatSettings::DateTimeFormat::POSIX:
+ readDateTimeTextPOSIX(x, istr);
+ return;
+ }
+
switch (settings.date_time_input_format)
{
case FormatSettings::DateTimeInputFormat::Basic:
@@ -41,6 +57,22 @@ SerializationDateTime::SerializationDateTime(
void SerializationDateTime::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
auto value = assert_cast<const ColumnType &>(column).getData()[row_num];
+ if (!settings.date_time_format.empty()) {
+ writeDateTimeTextFormat(value, ostr, settings.date_time_format);
+ return;
+ }
+
+ switch (settings.date_time_format_name) {
+ case FormatSettings::DateTimeFormat::Unspecified:
+ break;
+ case FormatSettings::DateTimeFormat::ISO:
+ writeDateTimeTextISO(value, ostr, utc_time_zone);
+ return;
+ case FormatSettings::DateTimeFormat::POSIX:
+ writeDateTimeTextPOSIX(value, ostr);
+ return;
+ }
+
switch (settings.date_time_output_format)
{
case FormatSettings::DateTimeOutputFormat::Simple:
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp
index eda2e54a22c..39e63f87d04 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp
@@ -26,6 +26,31 @@ SerializationDateTime64::SerializationDateTime64(
void SerializationDateTime64::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
auto value = assert_cast<const ColumnType &>(column).getData()[row_num];
+ if (!settings.timestamp_format.empty()) {
+ writeTimestampTextFormat(value, ostr, settings.timestamp_format);
+ return;
+ }
+
+ switch (settings.timestamp_format_name) {
+ case FormatSettings::TimestampFormat::Unspecified:
+ break;
+ case FormatSettings::TimestampFormat::ISO:
+ writeDateTimeTextISO(value, scale, ostr, utc_time_zone);
+ return;
+ case FormatSettings::TimestampFormat::POSIX:
+ writeTimestampTextPOSIX(value, ostr);
+ return;
+ case FormatSettings::TimestampFormat::UnixTimeMicroSeconds:
+ writeIntText(value.value, ostr);
+ return;
+ case FormatSettings::TimestampFormat::UnixTimeMilliseconds:
+ writeIntText(value.value / 1000, ostr);
+ return;
+ case FormatSettings::TimestampFormat::UnixTimeSeconds:
+ writeIntText(value.value / 1000000, ostr);
+ return;
+ }
+
switch (settings.date_time_output_format)
{
case FormatSettings::DateTimeOutputFormat::Simple:
@@ -59,6 +84,35 @@ void SerializationDateTime64::serializeTextEscaped(const IColumn & column, size_
static inline void readText(DateTime64 & x, UInt32 scale, ReadBuffer & istr, const FormatSettings & settings, const DateLUTImpl & time_zone, const DateLUTImpl & utc_time_zone)
{
+ if (!settings.timestamp_format.empty()) {
+ readTimestampTextFormat(x, istr, settings.timestamp_format);
+ return;
+ }
+
+ switch (settings.timestamp_format_name) {
+ case FormatSettings::TimestampFormat::Unspecified:
+ break;
+ case FormatSettings::TimestampFormat::ISO:
+ readTimestampTextISO(x, istr);
+ return;
+ case FormatSettings::TimestampFormat::POSIX:
+ readTimestampTextPOSIX(x, istr);
+ return;
+ case FormatSettings::TimestampFormat::UnixTimeMicroSeconds:
+ readIntText(x, istr);
+ return;
+ case FormatSettings::TimestampFormat::UnixTimeMilliseconds: {
+ readIntText(x, istr);
+ x *= 1000;
+ return;
+ }
+ case FormatSettings::TimestampFormat::UnixTimeSeconds: {
+ readIntText(x, istr);
+ x *= 1000000;
+ return;
+ }
+ }
+
switch (settings.date_time_input_format)
{
case FormatSettings::DateTimeInputFormat::Basic:
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h
index 47f6ee7bf27..d75c241d411 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h
@@ -29,6 +29,26 @@ struct FormatSettings
bool import_nested_json = false;
bool null_as_default = true;
bool decimal_trailing_zeros = false;
+ String date_time_format;
+ String timestamp_format;
+
+ enum class DateTimeFormat {
+ Unspecified,
+ ISO,
+ POSIX
+ };
+
+ enum class TimestampFormat {
+ Unspecified,
+ ISO,
+ POSIX,
+ UnixTimeMilliseconds,
+ UnixTimeSeconds,
+ UnixTimeMicroSeconds
+ };
+
+ DateTimeFormat date_time_format_name = DateTimeFormat::Unspecified;
+ TimestampFormat timestamp_format_name = TimestampFormat::Unspecified;
enum class DateTimeInputFormat
{
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h
index 99f14b67984..a4073f4637a 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h
@@ -2,6 +2,8 @@
#include <type_traits>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+
#include <IO/WriteBufferFromVector.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/Operators.h>
@@ -883,13 +885,13 @@ struct ConvertImplGenericToString
/** Conversion of strings to numbers, dates, datetimes: through parsing.
*/
template <typename DataType>
-void parseImpl(typename DataType::FieldType & x, ReadBuffer & rb, const DateLUTImpl *)
+void parseImpl(typename DataType::FieldType & x, ReadBuffer & rb, const DateLUTImpl *, const FormatSettings & format_settings)
{
readText(x, rb);
}
template <>
-inline void parseImpl<DataTypeDate>(DataTypeDate::FieldType & x, ReadBuffer & rb, const DateLUTImpl *)
+inline void parseImpl<DataTypeDate>(DataTypeDate::FieldType & x, ReadBuffer & rb, const DateLUTImpl *, const FormatSettings & format_settings)
{
DayNum tmp(0);
readDateText(tmp, rb);
@@ -897,7 +899,7 @@ inline void parseImpl<DataTypeDate>(DataTypeDate::FieldType & x, ReadBuffer & rb
}
template <>
-inline void parseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuffer & rb, const DateLUTImpl *)
+inline void parseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuffer & rb, const DateLUTImpl *, const FormatSettings & format_settings)
{
ExtendedDayNum tmp(0);
readDateText(tmp, rb);
@@ -906,9 +908,30 @@ inline void parseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuffer
// NOTE: no need of extra overload of DateTime64, since readDateTimeText64 has different signature and that case is explicitly handled in the calling code.
template <>
-inline void parseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, ReadBuffer & rb, const DateLUTImpl * time_zone)
+inline void parseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, ReadBuffer & rb, const DateLUTImpl * time_zone, const FormatSettings & format_settings)
{
time_t time = 0;
+ if (!format_settings.date_time_format.empty()) {
+ readDateTimeTextFormat(time, rb, format_settings.date_time_format);
+ x = time;
+ return;
+ }
+
+ switch (format_settings.date_time_format_name) {
+ case FormatSettings::DateTimeFormat::Unspecified:
+ break;
+ case FormatSettings::DateTimeFormat::ISO: {
+ readDateTimeTextISO(time, rb);
+ x = time;
+ return;
+ }
+ case FormatSettings::DateTimeFormat::POSIX: {
+ readDateTimeTextPOSIX(time, rb);
+ x = time;
+ return;
+ }
+ }
+
readDateTimeText(time, rb, *time_zone);
if (time < 0)
time = 0;
@@ -917,7 +940,7 @@ inline void parseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, ReadBuf
template <>
-inline void parseImpl<DataTypeUUID>(DataTypeUUID::FieldType & x, ReadBuffer & rb, const DateLUTImpl *)
+inline void parseImpl<DataTypeUUID>(DataTypeUUID::FieldType & x, ReadBuffer & rb, const DateLUTImpl *, const FormatSettings & format_settings)
{
UUID tmp;
readUUIDText(tmp, rb);
@@ -1016,6 +1039,39 @@ enum class ConvertFromStringParsingMode
BestEffortUS
};
+static inline void readTextTimestamp64(DateTime64 & x, UInt32 scale, ReadBuffer & istr, const DateLUTImpl & local_time_zone, const FormatSettings & settings)
+{
+ if (!settings.timestamp_format.empty()) {
+ readTimestampTextFormat(x, istr, settings.timestamp_format);
+ return;
+ }
+
+ switch (settings.timestamp_format_name) {
+ case FormatSettings::TimestampFormat::Unspecified:
+ break;
+ case FormatSettings::TimestampFormat::ISO:
+ readTimestampTextISO(x, istr);
+ return;
+ case FormatSettings::TimestampFormat::POSIX:
+ readTimestampTextPOSIX(x, istr);
+ return;
+ case FormatSettings::TimestampFormat::UnixTimeMicroSeconds:
+ readIntText(x, istr);
+ return;
+ case FormatSettings::TimestampFormat::UnixTimeMilliseconds: {
+ readIntText(x, istr);
+ x *= 1000;
+ return;
+ }
+ case FormatSettings::TimestampFormat::UnixTimeSeconds: {
+ readIntText(x, istr);
+ x *= 1000000;
+ return;
+ }
+ }
+ readDateTime64Text(x, scale, istr, local_time_zone);
+}
+
template <typename FromDataType, typename ToDataType, typename Name,
ConvertFromStringExceptionMode exception_mode, ConvertFromStringParsingMode parsing_mode>
struct ConvertThroughParsing
@@ -1046,7 +1102,7 @@ struct ConvertThroughParsing
template <typename Additions = void *>
static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & res_type, size_t input_rows_count,
- Additions additions [[maybe_unused]] = Additions())
+ Additions additions [[maybe_unused]] = Additions(), const FormatSettings & format_settings = {})
{
using ColVecTo = typename ToDataType::ColumnType;
@@ -1173,7 +1229,7 @@ struct ConvertThroughParsing
if constexpr (to_datetime64)
{
DateTime64 value = 0;
- readDateTime64Text(value, vec_to.getScale(), read_buffer, *local_time_zone);
+ readTextTimestamp64(value, vec_to.getScale(), read_buffer, *local_time_zone, format_settings);
vec_to[i] = value;
}
else if constexpr (IsDataTypeDecimal<ToDataType>)
@@ -1181,7 +1237,7 @@ struct ConvertThroughParsing
vec_to[i], read_buffer, ToDataType::maxPrecision(), vec_to.getScale());
else
{
- parseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone);
+ parseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone, format_settings);
}
}
@@ -1601,11 +1657,11 @@ public:
}
bool canBeExecutedOnDefaultArguments() const override { return false; }
- ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
+ ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const override
{
try
{
- return executeInternal(arguments, result_type, input_rows_count);
+ return executeInternal(arguments, result_type, input_rows_count, format_settings);
}
catch (Exception & e)
{
@@ -1649,7 +1705,7 @@ private:
mutable bool checked_return_type = false;
mutable bool to_nullable = false;
- ColumnPtr executeInternal(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const
+ ColumnPtr executeInternal(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const
{
if (arguments.empty())
throw Exception{"Function " + getName() + " expects at least 1 argument",
@@ -1712,9 +1768,13 @@ private:
else
result_column = ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>::execute(arguments, result_type, input_rows_count);
}
+ else if constexpr (std::is_base_of_v<ConvertThroughParsing<LeftDataType, RightDataType, Name, ConvertFromStringExceptionMode::Throw, ConvertFromStringParsingMode::Normal>, ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>>)
+ {
+ result_column = ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>::execute(arguments, result_type, input_rows_count, {}, format_settings);
+ }
else
{
- result_column = ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>::execute(arguments, result_type, input_rows_count);
+ result_column = ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>::execute(arguments, result_type, input_rows_count/*, format_settings*/);
}
return true;
@@ -1890,30 +1950,30 @@ public:
}
template <typename ConvertToDataType>
- ColumnPtr executeInternal(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, UInt32 scale = 0) const
+ ColumnPtr executeInternal(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, UInt32 scale = 0) const
{
const IDataType * from_type = arguments[0].type.get();
if (checkAndGetDataType<DataTypeString>(from_type))
{
return ConvertThroughParsing<DataTypeString, ConvertToDataType, Name, exception_mode, parsing_mode>::execute(
- arguments, result_type, input_rows_count, scale);
+ arguments, result_type, input_rows_count, scale, format_settings);
}
else if (checkAndGetDataType<DataTypeFixedString>(from_type))
{
return ConvertThroughParsing<DataTypeFixedString, ConvertToDataType, Name, exception_mode, parsing_mode>::execute(
- arguments, result_type, input_rows_count, scale);
+ arguments, result_type, input_rows_count, scale, format_settings);
}
return nullptr;
}
- ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
+ ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const override
{
ColumnPtr result_column;
if constexpr (to_decimal)
- result_column = executeInternal<ToDataType>(arguments, result_type, input_rows_count,
+ result_column = executeInternal<ToDataType>(arguments, result_type, input_rows_count, format_settings,
assert_cast<const ToDataType &>(*removeNullable(result_type)).getScale());
else
{
@@ -1924,15 +1984,15 @@ public:
scale = extractToDecimalScale(arguments[1]);
if (scale == 0)
- result_column = executeInternal<DataTypeDateTime>(arguments, result_type, input_rows_count);
+ result_column = executeInternal<DataTypeDateTime>(arguments, result_type, input_rows_count, format_settings);
else
{
- result_column = executeInternal<DataTypeDateTime64>(arguments, result_type, input_rows_count, static_cast<UInt32>(scale));
+ result_column = executeInternal<DataTypeDateTime64>(arguments, result_type, input_rows_count, format_settings, static_cast<UInt32>(scale));
}
}
else
{
- result_column = executeInternal<ToDataType>(arguments, result_type, input_rows_count);
+ result_column = executeInternal<ToDataType>(arguments, result_type, input_rows_count, format_settings);
}
}
@@ -2391,7 +2451,7 @@ public:
String getName() const override { return name; }
protected:
- ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
+ ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const override
{
/// drop second argument, pass others
ColumnsWithTypeAndName new_arguments{arguments.front()};
@@ -2460,12 +2520,12 @@ public:
const DataTypes & getArgumentTypes() const override { return argument_types; }
const DataTypePtr & getResultType() const override { return return_type; }
- ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & /*sample_columns*/) const override
+ ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & /*sample_columns*/, const FormatSettings & format_settings) const override
{
try
{
return std::make_unique<ExecutableFunctionCast>(
- prepareUnpackDictionaries(getArgumentTypes()[0], getResultType()), cast_name, diagnostic);
+ prepareUnpackDictionaries(getArgumentTypes()[0], getResultType(), format_settings), cast_name, diagnostic);
}
catch (Exception & e)
{
@@ -2503,14 +2563,14 @@ private:
std::optional<Diagnostic> diagnostic;
CastType cast_type;
- static WrapperType createFunctionAdaptor(FunctionPtr function, const DataTypePtr & from_type)
+ static WrapperType createFunctionAdaptor(FunctionPtr function, const DataTypePtr & from_type, const FormatSettings & format_settings)
{
auto function_adaptor = std::make_unique<FunctionToOverloadResolverAdaptor>(function)->build({ColumnWithTypeAndName{nullptr, from_type, ""}});
- return [function_adaptor]
+ return [function_adaptor, format_settings]
(ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t input_rows_count)
{
- return function_adaptor->execute(arguments, result_type, input_rows_count);
+ return function_adaptor->execute(arguments, result_type, input_rows_count, format_settings);
};
}
@@ -2525,7 +2585,7 @@ private:
}
template <typename ToDataType>
- WrapperType createWrapper(const DataTypePtr & from_type, const ToDataType * const to_type, bool requested_result_is_nullable) const
+ WrapperType createWrapper(const DataTypePtr & from_type, const ToDataType * const to_type, bool requested_result_is_nullable, const FormatSettings & format_settings) const
{
TypeIndex from_type_index = from_type->getTypeId();
WhichDataType which(from_type_index);
@@ -2537,12 +2597,12 @@ private:
/// In case when converting to Nullable type, we apply different parsing rule,
/// that will not throw an exception but return NULL in case of malformed input.
FunctionPtr function = FunctionConvertFromString<ToDataType, FunctionName, ConvertFromStringExceptionMode::Null>::create();
- return createFunctionAdaptor(function, from_type);
+ return createFunctionAdaptor(function, from_type, format_settings);
}
else if (!can_apply_accurate_cast)
{
FunctionPtr function = FunctionTo<ToDataType>::Type::create();
- return createFunctionAdaptor(function, from_type);
+ return createFunctionAdaptor(function, from_type, format_settings);
}
auto wrapper_cast_type = cast_type;
@@ -2597,7 +2657,7 @@ private:
static WrapperType createStringWrapper(const DataTypePtr & from_type)
{
FunctionPtr function = FunctionToString::create();
- return createFunctionAdaptor(function, from_type);
+ return createFunctionAdaptor(function, from_type, {});
}
WrapperType createFixedStringWrapper(const DataTypePtr & from_type, const size_t N) const
@@ -2617,7 +2677,7 @@ private:
template <typename ToDataType>
std::enable_if_t<IsDataTypeDecimal<ToDataType>, WrapperType>
- createDecimalWrapper(const DataTypePtr & from_type, const ToDataType * to_type, bool requested_result_is_nullable) const
+ createDecimalWrapper(const DataTypePtr & from_type, const ToDataType * to_type, bool requested_result_is_nullable, const FormatSettings & format_settings) const
{
TypeIndex type_index = from_type->getTypeId();
UInt32 scale = to_type->getScale();
@@ -2636,7 +2696,7 @@ private:
auto wrapper_cast_type = cast_type;
- return [wrapper_cast_type, type_index, scale, to_type, requested_result_is_nullable]
+ return [wrapper_cast_type, type_index, scale, to_type, requested_result_is_nullable, format_settings]
(ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *column_nullable, size_t input_rows_count)
{
ColumnPtr result_column;
@@ -2680,8 +2740,11 @@ private:
return true;
}
}
-
- result_column = ConvertImpl<LeftDataType, RightDataType, FunctionName>::execute(arguments, result_type, input_rows_count, scale);
+ if constexpr (std::is_base_of_v<ConvertThroughParsing<LeftDataType, RightDataType, FunctionName, ConvertFromStringExceptionMode::Throw, ConvertFromStringParsingMode::Normal>, ConvertImpl<LeftDataType, RightDataType, FunctionName>>) {
+ result_column = ConvertImpl<LeftDataType, RightDataType, FunctionName>::execute(arguments, result_type, input_rows_count, scale, format_settings);
+ } else {
+ result_column = ConvertImpl<LeftDataType, RightDataType, FunctionName>::execute(arguments, result_type, input_rows_count, scale);
+ }
return true;
});
@@ -2971,7 +3034,7 @@ private:
else if (isNativeNumber(from_type) || isEnum(from_type))
{
auto function = Function::create();
- return createFunctionAdaptor(function, from_type);
+ return createFunctionAdaptor(function, from_type, {});
}
else
{
@@ -3075,7 +3138,7 @@ private:
};
}
- WrapperType prepareUnpackDictionaries(const DataTypePtr & from_type, const DataTypePtr & to_type) const
+ WrapperType prepareUnpackDictionaries(const DataTypePtr & from_type, const DataTypePtr & to_type, const FormatSettings & format_settings = {}) const
{
const auto * from_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(from_type.get());
const auto * to_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(to_type.get());
@@ -3108,7 +3171,7 @@ private:
/// Disable check for dictionary. Will check that column doesn't contain NULL in wrapper below.
skip_not_null_check = true;
- auto wrapper = prepareRemoveNullable(from_nested, to_nested, skip_not_null_check);
+ auto wrapper = prepareRemoveNullable(from_nested, to_nested, skip_not_null_check, format_settings);
if (!from_low_cardinality && !to_low_cardinality)
return wrapper;
@@ -3177,14 +3240,14 @@ private:
};
}
- WrapperType prepareRemoveNullable(const DataTypePtr & from_type, const DataTypePtr & to_type, bool skip_not_null_check) const
+ WrapperType prepareRemoveNullable(const DataTypePtr & from_type, const DataTypePtr & to_type, bool skip_not_null_check, const FormatSettings & format_settings) const
{
/// Determine whether pre-processing and/or post-processing must take place during conversion.
bool source_is_nullable = from_type->isNullable();
bool result_is_nullable = to_type->isNullable();
- auto wrapper = prepareImpl(removeNullable(from_type), removeNullable(to_type), result_is_nullable);
+ auto wrapper = prepareImpl(removeNullable(from_type), removeNullable(to_type), result_is_nullable, format_settings);
if (result_is_nullable)
{
@@ -3255,7 +3318,7 @@ private:
/// 'from_type' and 'to_type' are nested types in case of Nullable.
/// 'requested_result_is_nullable' is true if CAST to Nullable type is requested.
- WrapperType prepareImpl(const DataTypePtr & from_type, const DataTypePtr & to_type, bool requested_result_is_nullable) const
+ WrapperType prepareImpl(const DataTypePtr & from_type, const DataTypePtr & to_type, bool requested_result_is_nullable, const FormatSettings & format_settings) const
{
if (from_type->equals(*to_type))
return createIdentityWrapper(from_type);
@@ -3289,7 +3352,7 @@ private:
std::is_same_v<ToDataType, DataTypeDateTime> ||
std::is_same_v<ToDataType, DataTypeUUID>)
{
- ret = createWrapper(from_type, checkAndGetDataType<ToDataType>(to_type.get()), requested_result_is_nullable);
+ ret = createWrapper(from_type, checkAndGetDataType<ToDataType>(to_type.get()), requested_result_is_nullable, format_settings);
return true;
}
if constexpr (
@@ -3306,7 +3369,7 @@ private:
std::is_same_v<ToDataType, DataTypeDecimal<Decimal256>> ||
std::is_same_v<ToDataType, DataTypeDateTime64>)
{
- ret = createDecimalWrapper(from_type, checkAndGetDataType<ToDataType>(to_type.get()), requested_result_is_nullable);
+ ret = createDecimalWrapper(from_type, checkAndGetDataType<ToDataType>(to_type.get()), requested_result_is_nullable, format_settings);
return true;
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h
index 81d77b99c12..d848c611d81 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h
@@ -35,7 +35,7 @@ public:
String getName() const override { return "FunctionExpression"; }
- ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
+ ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/, const FormatSettings & format_settings) const override
{
Block expr_columns;
for (size_t i = 0; i < arguments.size(); ++i)
@@ -82,7 +82,7 @@ public:
const DataTypes & getArgumentTypes() const override { return argument_types; }
const DataTypePtr & getResultType() const override { return return_type; }
- ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override
+ ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &, const FormatSettings & format_settings = {}) const override
{
return std::make_unique<ExecutableFunctionExpression>(expression_actions, signature);
}
@@ -120,7 +120,7 @@ public:
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
- ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
+ ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count, const FormatSettings & format_settings) const override
{
Names names;
DataTypes types;
@@ -175,7 +175,7 @@ public:
const DataTypes & getArgumentTypes() const override { return capture->captured_types; }
const DataTypePtr & getResultType() const override { return return_type; }
- ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override
+ ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &, const FormatSettings & format_settings = {}) const override
{
return std::make_unique<ExecutableFunctionCapture>(expression_actions, capture);
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp
index 3f1e989db62..60fbdcfbc1b 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp
@@ -115,7 +115,7 @@ void convertLowCardinalityColumnsToFull(ColumnsWithTypeAndName & args)
}
ColumnPtr IExecutableFunction::defaultImplementationForConstantArguments(
- const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const
+ const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const
{
ColumnNumbers arguments_to_remain_constants = getArgumentsThatAreAlwaysConstant();
@@ -158,7 +158,7 @@ ColumnPtr IExecutableFunction::defaultImplementationForConstantArguments(
"Number of arguments for function {} doesn't match: the function requires more arguments",
getName());
- ColumnPtr result_column = executeWithoutLowCardinalityColumns(temporary_columns, result_type, 1, dry_run);
+ ColumnPtr result_column = executeWithoutLowCardinalityColumns(temporary_columns, result_type, 1, format_settings, dry_run);
/// extremely rare case, when we have function with completely const arguments
/// but some of them produced by non isDeterministic function
@@ -170,7 +170,7 @@ ColumnPtr IExecutableFunction::defaultImplementationForConstantArguments(
ColumnPtr IExecutableFunction::defaultImplementationForNulls(
- const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const
+ const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const
{
if (args.empty() || !useDefaultImplementationForNulls())
return nullptr;
@@ -194,7 +194,7 @@ ColumnPtr IExecutableFunction::defaultImplementationForNulls(
ColumnsWithTypeAndName temporary_columns = createBlockWithNestedColumns(args);
auto temporary_result_type = removeNullable(result_type);
- auto res = executeWithoutLowCardinalityColumns(temporary_columns, temporary_result_type, input_rows_count, dry_run);
+ auto res = executeWithoutLowCardinalityColumns(temporary_columns, temporary_result_type, input_rows_count, format_settings, dry_run);
return wrapInNullable(res, args, result_type, input_rows_count);
}
@@ -202,19 +202,19 @@ ColumnPtr IExecutableFunction::defaultImplementationForNulls(
}
ColumnPtr IExecutableFunction::executeWithoutLowCardinalityColumns(
- const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const
+ const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const
{
- if (auto res = defaultImplementationForConstantArguments(args, result_type, input_rows_count, dry_run))
+ if (auto res = defaultImplementationForConstantArguments(args, result_type, input_rows_count, format_settings, dry_run))
return res;
- if (auto res = defaultImplementationForNulls(args, result_type, input_rows_count, dry_run))
+ if (auto res = defaultImplementationForNulls(args, result_type, input_rows_count, format_settings, dry_run))
return res;
ColumnPtr res;
if (dry_run)
- res = executeDryRunImpl(args, result_type, input_rows_count);
+ res = executeDryRunImpl(args, result_type, input_rows_count, format_settings);
else
- res = executeImpl(args, result_type, input_rows_count);
+ res = executeImpl(args, result_type, input_rows_count, format_settings);
if (!res)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty column was returned by function {}", getName());
@@ -222,7 +222,7 @@ ColumnPtr IExecutableFunction::executeWithoutLowCardinalityColumns(
return res;
}
-ColumnPtr IExecutableFunction::execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const
+ColumnPtr IExecutableFunction::execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const
{
ColumnPtr result;
@@ -242,7 +242,7 @@ ColumnPtr IExecutableFunction::execute(const ColumnsWithTypeAndName & arguments,
? input_rows_count
: columns_without_low_cardinality.front().column->size();
- auto res = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, dictionary_type, new_input_rows_count, dry_run);
+ auto res = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, dictionary_type, new_input_rows_count, format_settings, dry_run);
auto keys = res->convertToFullColumnIfConst();
auto res_mut_dictionary = DataTypeLowCardinality::createColumnUnique(*res_low_cardinality_type->getDictionaryType());
@@ -257,11 +257,11 @@ ColumnPtr IExecutableFunction::execute(const ColumnsWithTypeAndName & arguments,
else
{
convertLowCardinalityColumnsToFull(columns_without_low_cardinality);
- result = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, result_type, input_rows_count, dry_run);
+ result = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, result_type, input_rows_count, format_settings, dry_run);
}
}
else
- result = executeWithoutLowCardinalityColumns(arguments, result_type, input_rows_count, dry_run);
+ result = executeWithoutLowCardinalityColumns(arguments, result_type, input_rows_count, format_settings, dry_run);
return result;
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h
index 98a71698aff..af36eda9389 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h
@@ -4,6 +4,7 @@
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/Names.h>
#include <DataTypes/IDataType.h>
+#include <Formats/FormatSettings.h>
//#if !defined(ARCADIA_BUILD)
//# include "config_core.h"
@@ -46,15 +47,15 @@ public:
/// Get the main function name.
virtual String getName() const = 0;
- ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const;
+ ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const;
protected:
- virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const = 0;
+ virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const = 0;
- virtual ColumnPtr executeDryRunImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const
+ virtual ColumnPtr executeDryRunImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const
{
- return executeImpl(arguments, result_type, input_rows_count);
+ return executeImpl(arguments, result_type, input_rows_count, format_settings);
}
/** Default implementation in presence of Nullable arguments or NULL constants as arguments is the following:
@@ -90,13 +91,13 @@ protected:
private:
ColumnPtr defaultImplementationForConstantArguments(
- const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const;
+ const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const;
ColumnPtr defaultImplementationForNulls(
- const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const;
+ const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const;
ColumnPtr executeWithoutLowCardinalityColumns(
- const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const;
+ const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const;
};
@@ -114,9 +115,9 @@ public:
virtual ~IFunctionBase() = default;
virtual ColumnPtr execute(
- const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run = false) const
+ const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run = false) const
{
- return prepare(arguments)->execute(arguments, result_type, input_rows_count, dry_run);
+ return prepare(arguments, format_settings)->execute(arguments, result_type, input_rows_count, format_settings, dry_run);
}
/// Get the main function name.
@@ -127,7 +128,7 @@ public:
/// Do preparations and return executable.
/// sample_columns should contain data types of arguments and values of constants, if relevant.
- virtual ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & arguments) const = 0;
+ virtual ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & arguments, const FormatSettings & format_settings = {}) const = 0;
#if USE_EMBEDDED_COMPILER
@@ -375,10 +376,10 @@ public:
virtual String getName() const = 0;
- virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const = 0;
- virtual ColumnPtr executeImplDryRun(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const
+ virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings = {}) const = 0;
+ virtual ColumnPtr executeImplDryRun(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings = {}) const
{
- return executeImpl(arguments, result_type, input_rows_count);
+ return executeImpl(arguments, result_type, input_rows_count, format_settings);
}
/** Default implementation in presence of Nullable arguments or NULL constants as arguments is the following:
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h
index a6cc0c907bf..865f4495738 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h
@@ -16,14 +16,14 @@ public:
protected:
- ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const final
+ ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const final
{
- return function->executeImpl(arguments, result_type, input_rows_count);
+ return function->executeImpl(arguments, result_type, input_rows_count, format_settings);
}
- ColumnPtr executeDryRunImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const final
+ ColumnPtr executeDryRunImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const final
{
- return function->executeImplDryRun(arguments, result_type, input_rows_count);
+ return function->executeImplDryRun(arguments, result_type, input_rows_count, format_settings);
}
bool useDefaultImplementationForNulls() const final { return function->useDefaultImplementationForNulls(); }
@@ -60,7 +60,7 @@ public:
#endif
- ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & /*arguments*/) const override
+ ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & /*arguments*/, const FormatSettings & format_settings = {}) const override
{
return std::make_unique<FunctionToExecutableFunctionAdaptor>(function);
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h
index 8148cf3a27a..6ecc168f8d5 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h
@@ -60,7 +60,7 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
- ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
+ ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/, const FormatSettings & format_settings) const override
{
const auto n = arguments[1].column->getUInt(0);
return executeForN<ConvertToFixedStringExceptionMode::Throw>(arguments, n);
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp
index 016c9cb69c7..879b6f0dfe1 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp
@@ -10,6 +10,8 @@
#include <common/find_symbols.h>
#include <stdlib.h>
+#include <util/datetime/base.h>
+
#ifdef __SSE2__
#include <emmintrin.h>
#endif
@@ -1166,4 +1168,68 @@ bool loadAtPosition(ReadBuffer & in, Memory<> & memory, char * & current)
return loaded_more;
}
+void readDateTimeTextISO(time_t & x, ReadBuffer & istr) {
+ for (const auto& offset: {25, 20}) {
+ if (istr.position() + offset <= istr.buffer().end()) {
+ TInstant result;
+ if (TInstant::TryParseIso8601(TStringBuf{istr.position(), istr.position() + offset}, result)) {
+ x = result.GetValue() / 1000000;
+ istr.position() += offset;
+ return;
+ }
+ }
+ }
+ throw yexception() << "error in datetime parsing. Input data: " << istr.position();
+}
+
+void readDateTimeTextFormat(time_t & x, ReadBuffer & istr, const String& format)
+{
+ struct tm input_tm;
+ memset(&input_tm, 0, sizeof(tm));
+ input_tm.tm_mday = 1;
+ auto ptr = strptime(istr.position(), format.c_str(), &input_tm);
+ if (ptr == nullptr) {
+ ythrow yexception() << "Can't parse date " << istr.position() << " in " << format << " format";
+ }
+ istr.position() = ptr;
+ x = TimeGM(&input_tm);
+}
+
+void readDateTimeTextPOSIX(time_t & x, ReadBuffer & istr)
+{
+ readDateTimeTextFormat(x, istr, "%Y-%m-%d %H:%M:%S");
+}
+
+void readTimestampTextISO(DateTime64 & x, ReadBuffer & istr) {
+ for (const auto& offset: {27, 25, 24, 20}) {
+ if (istr.position() + offset <= istr.buffer().end()) {
+ TInstant result;
+ if (TInstant::TryParseIso8601(TStringBuf{istr.position(), istr.position() + offset}, result)) {
+ x = result.GetValue();
+ istr.position() += offset;
+ return;
+ }
+ }
+ }
+ throw yexception() << "error in datetime parsing. Input data: " << istr.position();
+}
+
+void readTimestampTextFormat(DateTime64 & x, ReadBuffer & istr, const String& format)
+{
+ struct tm input_tm;
+ memset(&input_tm, 0, sizeof(tm));
+ input_tm.tm_mday = 1;
+ auto ptr = strptime(istr.position(), format.c_str(), &input_tm);
+ if (ptr == nullptr) {
+ ythrow yexception() << "Can't parse date " << istr.position() << " in " << format << " format";
+ }
+ istr.position() = ptr;
+ x = TimeGM(&input_tm) * 1000000;
+}
+
+void readTimestampTextPOSIX(DateTime64 & x, ReadBuffer & istr)
+{
+ readTimestampTextFormat(x, istr, "%Y-%m-%d %H:%M:%S");
+}
+
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h
index e88d3d7cf52..29cf4c5af40 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h
@@ -886,6 +886,18 @@ inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf)
datetime.second((s[17] - '0') * 10 + (s[18] - '0'));
}
+void readDateTimeTextISO(time_t & x, ReadBuffer & istr);
+
+void readDateTimeTextFormat(time_t & x, ReadBuffer & istr, const String& format);
+
+void readDateTimeTextPOSIX(time_t & x, ReadBuffer & istr);
+
+void readTimestampTextISO(DateTime64 & x, ReadBuffer & istr);
+
+void readTimestampTextFormat(DateTime64 & x, ReadBuffer & istr, const String& format);
+
+void readTimestampTextPOSIX(DateTime64 & x, ReadBuffer & istr);
+
/// Generic methods to read value in native binary format.
template <typename T>
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp
index 5dd56030ab8..51e321f0f7a 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp
@@ -3,6 +3,9 @@
#include <Common/hex.h>
#include <Common/StringUtils/StringUtils.h>
+#include <util/datetime/base.h>
+#include <util/generic/yexception.h>
+#include <util/memory/tempbuf.h>
namespace NDB
{
@@ -98,4 +101,43 @@ void writePointerHex(const void * ptr, WriteBuffer & buf)
buf.write(hex_str, 2 * sizeof(ptr));
}
+void writeDateTimeTextFormat(time_t datetime, WriteBuffer & buf, const String& format)
+{
+ struct tm tm{};
+ gmtime_r(&datetime, &tm);
+ size_t size = Max<size_t>(format.size() * 2 + 1, 107);
+ TTempBuf tmp(size);
+ int r = strftime(tmp.Data(), tmp.Size(), format.c_str(), &tm);
+ if (r > 0) {
+ buf.write(tmp.Data(), r);
+ return;
+ }
+ ythrow yexception() << "Can't format date in format " << format;
+}
+
+void writeDateTimeTextPOSIX(time_t datetime, WriteBuffer & buf)
+{
+ writeDateTimeTextFormat(datetime, buf, "%Y-%m-%d %H:%M:%S");
+}
+
+void writeTimestampTextFormat(DateTime64 datetime64, WriteBuffer & buf, const String& format)
+{
+ struct tm tm{};
+ time_t t = datetime64 / 1000000;
+ gmtime_r(&t, &tm);
+ size_t size = Max<size_t>(format.size() * 2 + 1, 107);
+ TTempBuf tmp(size);
+ int r = strftime(tmp.Data(), tmp.Size(), format.c_str(), &tm);
+ if (r > 0) {
+ buf.write(tmp.Data(), r);
+ return;
+ }
+ ythrow yexception() << "Can't format date in format " << format;
+}
+
+void writeTimestampTextPOSIX(DateTime64 datetime, WriteBuffer & buf)
+{
+ writeTimestampTextFormat(datetime, buf, "%Y-%m-%d %H:%M:%S");
+}
+
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h
index 1a15890b50f..8275fe6d4da 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h
@@ -858,6 +858,14 @@ inline void writeDateTimeUnixTimestamp(DateTime64 datetime64, UInt32 scale, Writ
}
}
+void writeDateTimeTextFormat(time_t datetime, WriteBuffer & buf, const String& format);
+
+void writeDateTimeTextPOSIX(time_t datetime, WriteBuffer & buf);
+
+void writeTimestampTextFormat(DateTime64 datetime64, WriteBuffer & buf, const String& format);
+
+void writeTimestampTextPOSIX(DateTime64 datetime, WriteBuffer & buf);
+
/// Methods for output in binary format.
template <typename T>
inline std::enable_if_t<is_arithmetic_v<T>, void>
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp
index 6cdf67620d8..79bde3e54d0 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp
@@ -7,7 +7,7 @@ namespace NDB
{
template <CastType cast_type = CastType::nonAccurate>
-static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
+static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings)
{
if (arg.type->equals(*type))
return arg.column;
@@ -28,27 +28,27 @@ static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr
if constexpr (cast_type == CastType::accurateOrNull)
{
- return func_cast->execute(arguments, makeNullable(type), arg.column->size());
+ return func_cast->execute(arguments, makeNullable(type), arg.column->size(), format_settings);
}
else
{
- return func_cast->execute(arguments, type, arg.column->size());
+ return func_cast->execute(arguments, type, arg.column->size(), format_settings);
}
}
-ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
+ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings)
{
- return castColumn<CastType::nonAccurate>(arg, type);
+ return castColumn<CastType::nonAccurate>(arg, type, format_settings);
}
-ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
+ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings)
{
- return castColumn<CastType::accurate>(arg, type);
+ return castColumn<CastType::accurate>(arg, type, {});
}
-ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type)
+ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings)
{
- return castColumn<CastType::accurateOrNull>(arg, type);
+ return castColumn<CastType::accurateOrNull>(arg, type, {});
}
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h
index 2378ba9e05e..86560e58ec5 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h
@@ -5,8 +5,8 @@
namespace NDB
{
-ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type);
-ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type);
-ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type);
+ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings);
+ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings);
+ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings);
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
index ad76ba146a0..474b7391203 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
@@ -515,7 +515,7 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
{
}
-void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
+void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, const FormatSettings & format_settings)
{
Columns columns_list;
UInt64 num_rows = 0;
@@ -571,7 +571,7 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
else
column = readColumnFromArrowColumn(arrow_column, header_column.name, format_name, false, dictionary_values);
- column.column = castColumn(column, header_column.type);
+ column.column = castColumn(column, header_column.type, format_settings);
column.type = header_column.type;
num_rows = column.column->size();
columns_list.push_back(std::move(column.column));
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h
index 5534eac8532..cfc12133980 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h
@@ -27,7 +27,7 @@ public:
/// data from file without knowing table structure.
ArrowColumnToCHColumn(const arrow::Schema & schema, const std::string & format_name, bool import_nested_);
- void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
+ void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, const FormatSettings & format_settings);
private:
const Block header;
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
index c2d6ee1b0ea..de3bfdb8a60 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
@@ -54,7 +54,7 @@ Chunk ParquetBlockInputFormat::generate()
++row_group_current;
- arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
+ arrow_column_to_ch_column->arrowTableToCHChunk(res, table, format_settings);
return res;
}