diff options
author | hcpp <hcpp@ydb.tech> | 2022-12-10 09:57:44 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2022-12-10 09:57:44 +0300 |
commit | a8aefff09fd4d9e76262100a23301e19259f8a3c (patch) | |
tree | 8ab22dc7b2f107b7ac5a79f80e8d9cea3252fb5c | |
parent | 1bb7312173c0162299689aaecb798070864bd9b8 (diff) | |
download | ydb-a8aefff09fd4d9e76262100a23301e19259f8a3c.tar.gz |
format setting has been supported for timestamp and datetime
40 files changed, 897 insertions, 111 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp index 75b3c3e7942..a9ccae0b9a2 100644 --- a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp +++ b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp @@ -105,8 +105,60 @@ NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& se return issues; } +NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) { + NYql::TIssues issues; + TSet<TString> conflictingKeys; + for (const auto& [key, value]: formatSetting) { + if (key == "data.datetime.format_name"sv) { + if (!IsValidDateTimeFormatName(value)) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.datetime.format_name " + value)); + } + if (conflictingKeys.contains("data.datetime.format")) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together")); + } + conflictingKeys.insert("data.datetime.format_name"); + continue; + } + + if (key == "data.datetime.format"sv) { + if (conflictingKeys.contains("data.datetime.format_name")) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together")); + } + conflictingKeys.insert("data.datetime.format"); + continue; + } + + if (key == "data.timestamp.format_name"sv) { + if (!IsValidTimestampFormatName(value)) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.timestamp.format_name " + value)); + } + if (conflictingKeys.contains("data.timestamp.format")) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together")); + } + conflictingKeys.insert("data.timestamp.format_name"); + continue; + } + + if (key == "data.timestamp.format"sv) { + if (conflictingKeys.contains("data.timestamp.format_name")) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together")); + } + conflictingKeys.insert("data.timestamp.format"); + continue; + } + + if (matchAllSettings) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key)); + } + } + return issues; +} + + NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) { NYql::TIssues issues; + TSet<TString> conflictingKeys; + issues.AddIssues(ValidateDateFormatSetting(formatSetting)); for (const auto& [key, value]: formatSetting) { if (key == "file_pattern"sv) { continue; @@ -118,6 +170,11 @@ NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobu } continue; } + + if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv}, key)) { + continue; + } + if (key == "csv_delimiter"sv) { if (format != "csv_with_names"sv) { issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should be used only with format csv_with_names")); diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.h b/ydb/core/yq/libs/control_plane_storage/request_validators.h index 849e61cb5c5..98438c38d17 100644 --- a/ydb/core/yq/libs/control_plane_storage/request_validators.h +++ b/ydb/core/yq/libs/control_plane_storage/request_validators.h @@ -78,6 +78,8 @@ NYql::TIssues ValidateQuery(const T& ev, size_t maxSize) NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting); + +NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false); NYql::TIssues ValidateProjectionColumns(const YandexQuery::Schema& schema, const TVector<TString>& partitionedBy); template<typename T> @@ -111,7 +113,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<YandexQuer if (!dataStreams.has_schema()) { issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden")); } - issues.AddIssues(ValidateFormatSetting(dataStreams.format(), dataStreams.format_setting())); + issues.AddIssues(ValidateDateFormatSetting(dataStreams.format_setting(), true)); break; } case YandexQuery::BindingSetting::BINDING_NOT_SET: { diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp index 169194601dc..f32f8b8be54 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.cpp +++ b/ydb/core/yq/libs/control_plane_storage/util.cpp @@ -189,7 +189,7 @@ std::pair<TString, TString> SplitId(const TString& id, char delim) { } bool IsValidIntervalUnit(const TString& unit) { - static constexpr std::array<std::string_view, 10> IntervalUnits = { + static constexpr std::array<std::string_view, 7> IntervalUnits = { "MICROSECONDS"sv, "MILLISECONDS"sv, "SECONDS"sv, @@ -201,4 +201,23 @@ bool IsValidIntervalUnit(const TString& unit) { return IsIn(IntervalUnits, unit); } +bool IsValidDateTimeFormatName(const TString& formatName) { + static constexpr std::array<std::string_view, 2> FormatNames = { + "POSIX"sv, + "ISO"sv + }; + return IsIn(FormatNames, formatName); +} + +bool IsValidTimestampFormatName(const TString& formatName) { + static constexpr std::array<std::string_view, 5> FormatNames = { + "POSIX"sv, + "ISO"sv, + "UNIX_TIME_MILLISECONDS"sv, + "UNIX_TIME_SECONDS"sv, + "UNIX_TIME_MICROSECONDS"sv + }; + return IsIn(FormatNames, formatName); +} + } //namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/util.h b/ydb/core/yq/libs/control_plane_storage/util.h index ca100195757..af8dbd27978 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.h +++ b/ydb/core/yq/libs/control_plane_storage/util.h @@ -48,4 +48,8 @@ std::pair<TString, TString> SplitId(const TString& id, char delim = '-'); bool IsValidIntervalUnit(const TString& unit); +bool IsValidDateTimeFormatName(const TString& formatName); + +bool IsValidTimestampFormatName(const TString& formatName); + } // namespace NYq diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp index 5fbbc9df26b..cbe39bbc3e5 100644 --- a/ydb/library/yql/providers/common/mkql/parser.cpp +++ b/ydb/library/yql/providers/common/mkql/parser.cpp @@ -143,6 +143,7 @@ TRuntimeNode BuildParseCall( std::unordered_map<TString, ui32>&& metadataColumns, const std::string_view& format, const std::string_view& compression, + const std::vector<std::pair<std::string_view, std::string_view>>& formatSettings, TType* inputType, TType* parseItemType, TType* finalItemType, @@ -274,13 +275,28 @@ TRuntimeNode BuildParseCall( inputDataType = inputItemTuple->GetElementType(0); } + TString settingsAsJson; + TStringOutput stream(settingsAsJson); + NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig()); + writer.OpenMap(); + + for (const auto& v : formatSettings) { + writer.Write(v.first, v.second); + } + + writer.CloseMap(); + writer.Flush(); + if (settingsAsJson == "{}") { + settingsAsJson.clear(); + } + const auto userType = ctx.ProgramBuilder.NewTupleType({ ctx.ProgramBuilder.NewTupleType({inputType}), ctx.ProgramBuilder.NewStructType({}), userOutputType}); input = TType::EKind::Resource == inputDataType->GetKind() ? ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseBlocks", {}, userType), {input})): - ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseFormat", {}, userType, format), {input})); + ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseFormat", {}, userType, format + settingsAsJson), {input})); } return ctx.ProgramBuilder.ExpandMap(input, @@ -348,6 +364,13 @@ TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWrapBase& wrapper, NCommon } } + std::vector<std::pair<std::string_view, std::string_view>> formatSettings; + if (auto settings = GetSetting(wrapper.Settings().Cast().Ref(), "formatSettings")) { + settings->Tail().ForEachChild([&](const TExprNode& v) { + formatSettings.emplace_back(v.Child(0)->Content(), v.Child(1)->Content()); + }); + } + auto parsedItems = rowType->GetItems(); EraseIf(parsedItems, [extraType, &metadataColumns](const auto& item) { return extraType && extraType->FindItem(item->GetName()) || metadataColumns.contains(TString(item->GetName())); @@ -372,6 +395,7 @@ TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWrapBase& wrapper, NCommon std::move(metadataColumns), format.Content() + settings.front(), settings.back(), + formatSettings, inputType, parseItemType, finalItemType, diff --git a/ydb/library/yql/providers/common/mkql/parser.h b/ydb/library/yql/providers/common/mkql/parser.h index 30d023b2a0a..cf35d6447bd 100644 --- a/ydb/library/yql/providers/common/mkql/parser.h +++ b/ydb/library/yql/providers/common/mkql/parser.h @@ -20,6 +20,7 @@ NKikimr::NMiniKQL::TRuntimeNode BuildParseCall( std::unordered_map<TString, ui32>&& metadataColumns, const std::string_view& format, const std::string_view& compression, + const std::vector<std::pair<std::string_view, std::string_view>>& formatSettings, NKikimr::NMiniKQL::TType* inputType, NKikimr::NMiniKQL::TType* parseItemType, NKikimr::NMiniKQL::TType* finalItemType, diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 16e5129cf0c..ffc54481edc 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -49,7 +49,7 @@ namespace { "bzip2"sv, "xz"sv }; - constexpr std::array<std::string_view, 10> IntervalUnits = { + constexpr std::array<std::string_view, 7> IntervalUnits = { "MICROSECONDS"sv, "MILLISECONDS"sv, "SECONDS"sv, @@ -58,6 +58,18 @@ namespace { "DAYS"sv, "WEEKS"sv }; + constexpr std::array<std::string_view, 2> DateTimeFormatNames = { + "POSIX"sv, + "ISO"sv + }; + + constexpr std::array<std::string_view, 5> TimestampFormatNames = { + "POSIX"sv, + "ISO"sv, + "UNIX_TIME_MILLISECONDS"sv, + "UNIX_TIME_SECONDS"sv, + "UNIX_TIME_MICROSECONDS"sv + }; } // namespace bool TCommitSettings::EnsureModeEmpty(TExprContext& ctx) { @@ -1163,14 +1175,27 @@ bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) { return false; } -bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx) { - if (unit.empty() || IsIn(IntervalUnits, unit)) { +template<typename T> +bool ValidateValueInDictionary(std::string_view value, TExprContext& ctx, const T& dictionary) { + if (value.empty() || IsIn(dictionary, value)) { return true; } - ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << unit - << ". Use one of: " << JoinSeq(", ", IntervalUnits))); + ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << value + << ". Use one of: " << JoinSeq(", ", dictionary))); return false; } +bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx) { + return ValidateValueInDictionary(unit, ctx, IntervalUnits); +} + +bool ValidateDateTimeFormatName(std::string_view formatName, TExprContext& ctx) { + return ValidateValueInDictionary(formatName, ctx, DateTimeFormatNames); +} + +bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx) { + return ValidateValueInDictionary(formatName, ctx, TimestampFormatNames); +} + } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index cf2f257aac7..87242eab37f 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -141,7 +141,9 @@ bool ValidateCompressionForOutput(std::string_view compression, TExprContext& ct bool ValidateFormatForInput(std::string_view format, TExprContext& ctx); bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx); -bool ValidateIntervalUnit(std::string_view format, TExprContext& ctx); +bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx); +bool ValidateDateTimeFormatName(std::string_view formatName, TExprContext& ctx); +bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx); } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json b/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json index bad47e9d70e..e43a69ba175 100644 --- a/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json +++ b/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json @@ -58,7 +58,8 @@ {"Index": 3, "Name": "Columns", "Type": "TExprBase"}, {"Index": 4, "Name": "Format", "Type": "TCoAtom"}, {"Index": 5, "Name": "Compression", "Type": "TCoAtom"}, - {"Index": 6, "Name": "LimitHint", "Type": "TExprBase", "Optional": true} + {"Index": 6, "Name": "LimitHint", "Type": "TExprBase", "Optional": true}, + {"Index": 7, "Name": "Settings", "Type": "TExprList", "Optional": true} ] }, { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp index 582456cc1a5..bb15fd399ba 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp @@ -125,12 +125,69 @@ public: format = "raw"; } + auto settings = Build<TExprList>(ctx, read.Pos()); + bool hasDateTimeFormat = false; + bool hasDateTimeFormatName = false; + bool hasTimestampFormat = false; + bool hasTimestampFormatName = false; + if (topicKeyParser.GetDateTimeFormatName()) { + settings.Add(topicKeyParser.GetDateTimeFormatName()); + hasDateTimeFormatName = true; + if (!NCommon::ValidateDateTimeFormatName(topicKeyParser.GetDateTimeFormatName()->Child(1)->Content(), ctx)) { + return nullptr; + } + } + + if (topicKeyParser.GetDateTimeFormat()) { + settings.Add(topicKeyParser.GetDateTimeFormat()); + hasDateTimeFormat = true; + } + + if (topicKeyParser.GetTimestampFormatName()) { + settings.Add(topicKeyParser.GetTimestampFormatName()); + hasDateTimeFormatName = true; + if (!NCommon::ValidateTimestampFormatName(topicKeyParser.GetTimestampFormatName()->Child(1)->Content(), ctx)) { + return nullptr; + } + } + + if (topicKeyParser.GetTimestampFormat()) { + settings.Add(topicKeyParser.GetTimestampFormat()); + hasTimestampFormat = true; + } + + if (hasDateTimeFormat && hasDateTimeFormatName) { + ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.datetime.format_name and data.datetime.format together")); + return nullptr; + } + + if (hasTimestampFormat && hasTimestampFormatName) { + ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.timestamp.format_name and data.timestamp.format together")); + return nullptr; + } + + if (!hasDateTimeFormat && !hasDateTimeFormatName) { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(read.Pos(), "data.datetime.formatname")); + pair.push_back(ctx.NewAtom(read.Pos(), "POSIX")); + settings.Add(ctx.NewList(read.Pos(), std::move(pair))); + } + + if (!hasTimestampFormat && !hasTimestampFormatName) { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(read.Pos(), "data.timestamp.formatname")); + pair.push_back(ctx.NewAtom(read.Pos(), "POSIX")); + settings.Add(ctx.NewList(read.Pos(), std::move(pair))); + } + auto builder = Build<TPqReadTopic>(ctx, read.Pos()) .World(read.World()) .DataSource(read.DataSource()) .Topic(std::move(topicNode)) .Format().Value(format).Build() - .Compression().Value(topicKeyParser.GetCompression()).Build(); + .Compression().Value(topicKeyParser.GetCompression()).Build() + .LimitHint<TCoVoid>().Build() + .Settings(settings.Done()); if (topicKeyParser.GetColumnOrder()) { builder.Columns(topicKeyParser.GetColumnOrder()); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index 639dcf3024e..2680ddb56b4 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -85,7 +85,7 @@ public: } TStatus HandleReadTopic(TExprBase input, TExprContext& ctx) { - if (!EnsureMinMaxArgsCount(input.Ref(), 6, 7, ctx)) { + if (!EnsureMinMaxArgsCount(input.Ref(), 6, 8, ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index fbd03b35c7a..a9cfab9cee8 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -111,6 +111,12 @@ public: .Value(ctx.NewList(pqReadTopic.Pos(), std::move(metadataFieldsList))) .Done()); + + settings.push_back(Build<TCoNameValueTuple>(ctx, pqReadTopic.Pos()) + .Name().Build("formatSettings") + .Value(std::move(pqReadTopic.Settings())) + .Done()); + const auto token = "cluster:default_" + clusterName; auto columns = pqReadTopic.Columns().Ptr(); if (!columns->IsList()) { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp index 0864b7b6170..2b3d9a8190a 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp @@ -51,6 +51,22 @@ bool TTopicKeyParser::Parse(const TExprNode& expr, TExprNode::TPtr readSettings, Compression = readSettings->Child(i)->Child(1)->Content(); continue; } + if (readSettings->Child(i)->Head().IsAtom("data.datetime.formatname")) { + DateTimeFormatName = readSettings->Child(i); + continue; + } + if (readSettings->Child(i)->Head().IsAtom("data.datetime.format")) { + DateTimeFormat = readSettings->Child(i); + continue; + } + if (readSettings->Child(i)->Head().IsAtom("data.timestamp.formatname")) { + TimestampFormatName = readSettings->Child(i); + continue; + } + if (readSettings->Child(i)->Head().IsAtom("data.timestamp.format")) { + TimestampFormat = readSettings->Child(i); + continue; + } } } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h index b22f61125fd..2fabecb2765 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h @@ -30,6 +30,22 @@ public: return Compression; } + TExprNode::TPtr GetDateTimeFormatName() const { + return DateTimeFormatName; + } + + TExprNode::TPtr GetDateTimeFormat() { + return DateTimeFormat; + } + + TExprNode::TPtr GetTimestampFormatName() { + return TimestampFormatName; + } + + TExprNode::TPtr GetTimestampFormat() { + return TimestampFormat; + } + bool Parse(const TExprNode& expr, TExprNode::TPtr readSettings, TExprContext& ctx); private: @@ -40,6 +56,10 @@ private: TString TopicPath; TString Format; TString Compression; + TExprNode::TPtr DateTimeFormatName; + TExprNode::TPtr DateTimeFormat; + TExprNode::TPtr TimestampFormatName; + TExprNode::TPtr TimestampFormat; TExprNode::TPtr UserSchema; TExprNode::TPtr ColumnOrder; }; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 8127b8af3c3..c0db287dd0c 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1231,6 +1231,31 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type, NSerialization::TSerializat return nullptr; } +NDB::FormatSettings::DateTimeFormat ToDateTimeFormat(const TString& formatName) { + static TMap<TString, NDB::FormatSettings::DateTimeFormat> formats{ + {"POSIX", NDB::FormatSettings::DateTimeFormat::POSIX}, + {"ISO", NDB::FormatSettings::DateTimeFormat::ISO} + }; + if (auto it = formats.find(formatName); it != formats.end()) { + return it->second; + } + return NDB::FormatSettings::DateTimeFormat::Unspecified; +} + +NDB::FormatSettings::TimestampFormat ToTimestampFormat(const TString& formatName) { + static TMap<TString, NDB::FormatSettings::TimestampFormat> formats{ + {"POSIX", NDB::FormatSettings::TimestampFormat::POSIX}, + {"ISO", NDB::FormatSettings::TimestampFormat::ISO}, + {"UNIX_TIME_MILLISECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeMilliseconds}, + {"UNIX_TIME_SECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeSeconds}, + {"UNIX_TIME_MICROSECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeMicroSeconds} + }; + if (auto it = formats.find(formatName); it != formats.end()) { + return it->second; + } + return NDB::FormatSettings::TimestampFormat::Unspecified; +} + } // namespace using namespace NKikimr::NMiniKQL; @@ -1317,6 +1342,30 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( if (const auto it = settings.find("csvdelimiter"); settings.cend() != it && !it->second.empty()) readSpec->Settings.csv.delimiter = it->second[0]; + if (const auto it = settings.find("data.datetime.formatname"); settings.cend() != it) { + readSpec->Settings.date_time_format_name = ToDateTimeFormat(it->second); + } + + if (const auto it = settings.find("data.datetime.format"); settings.cend() != it) { + readSpec->Settings.date_time_format = it->second; + } + + if (const auto it = settings.find("data.timestamp.formatname"); settings.cend() != it) { + readSpec->Settings.timestamp_format_name = ToTimestampFormat(it->second); + } + + if (const auto it = settings.find("data.timestamp.format"); settings.cend() != it) { + readSpec->Settings.timestamp_format = it->second; + } + + if (readSpec->Settings.date_time_format_name == NDB::FormatSettings::DateTimeFormat::Unspecified && readSpec->Settings.date_time_format.empty()) { + readSpec->Settings.date_time_format_name = NDB::FormatSettings::DateTimeFormat::POSIX; + } + + if (readSpec->Settings.timestamp_format_name == NDB::FormatSettings::TimestampFormat::Unspecified && readSpec->Settings.timestamp_format.empty()) { + readSpec->Settings.timestamp_format_name = NDB::FormatSettings::TimestampFormat::POSIX; + } + #define SUPPORTED_FLAGS(xx) \ xx(skip_unknown_fields, true) \ xx(import_nested_json, true) \ diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 6626328e136..cf888e5e8d0 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -115,6 +115,10 @@ private: if (!EnsureTuple(*input->Child(TS3Target::idx_Settings), ctx)) return TStatus::Error; + bool hasDateTimeFormat = false; + bool hasDateTimeFormatName = false; + bool hasTimestampFormat = false; + bool hasTimestampFormatName = false; const auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) { if (name == "compression") { const auto& value = setting.Tail(); @@ -151,6 +155,46 @@ private: return EnsureValidUserSchemaSetting(setting, ctx); } + if (name == "data.datetime.formatname") { + hasDateTimeFormatName = true; + const auto& value = setting.Tail(); + if (!EnsureAtom(value, ctx)) { + return false; + } + + return NCommon::ValidateDateTimeFormatName(value.Content(), ctx); + } + + if (name == "data.timestamp.formatname") { + hasTimestampFormatName = true; + const auto& value = setting.Tail(); + if (!EnsureAtom(value, ctx)) { + return false; + } + + return NCommon::ValidateTimestampFormatName(value.Content(), ctx); + } + + if (name == "data.datetime.format") { + hasDateTimeFormat = true; + const auto& value = setting.Tail(); + if (!EnsureAtom(value, ctx)) { + return false; + } + + return true; + } + + if (name == "data.timestamp.format") { + hasTimestampFormat = true; + const auto& value = setting.Tail(); + if (!EnsureAtom(value, ctx)) { + return false; + } + + return true; + } + if (name == "csvdelimiter") { const auto& value = setting.Tail(); if (!EnsureAtom(value, ctx)) { @@ -172,7 +216,17 @@ private: return true; }; - if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "csvdelimiter", "filepattern"}, validator, ctx)) { + if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "data.datetime.formatname", "data.datetime.format", "data.timestamp.formatname", "data.timestamp.format", "csvdelimiter", "filepattern"}, validator, ctx)) { + return TStatus::Error; + } + + if (hasDateTimeFormat && hasDateTimeFormatName) { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Don't use data.datetime.format_name and data.datetime.format together")); + return TStatus::Error; + } + + if (hasTimestampFormat && hasTimestampFormatName) { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Don't use data.timestamp.format_name and data.timestamp.format together")); return TStatus::Error; } } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 25c651a1e24..07703efa8fc 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -293,6 +293,10 @@ public: if (input->ChildrenSize() > TS3Object::idx_Settings) { bool haveProjection = false; bool havePartitionedBy = false; + bool hasDateTimeFormat = false; + bool hasDateTimeFormatName = false; + bool hasTimestampFormat = false; + bool hasTimestampFormatName = false; auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) { if (name != "partitionedby"sv && name != "directories"sv && setting.ChildrenSize() != 2) { ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()), @@ -339,6 +343,42 @@ public: return NCommon::ValidateIntervalUnit(unit, ctx); } + if (name == "data.datetime.formatname"sv) { + hasDateTimeFormatName = true; + TStringBuf formatName; + if (!ExtractSettingValue(setting.Tail(), "data.datetime.format_name"sv, format, {}, ctx, formatName)) { + return false; + } + return NCommon::ValidateDateTimeFormatName(formatName, ctx); + } + + if (name == "data.datetime.format"sv) { + hasDateTimeFormat = true; + TStringBuf unused; + if (!ExtractSettingValue(setting.Tail(), "data.datetime.format"sv, format, {}, ctx, unused)) { + return false; + } + return true; + } + + if (name == "data.timestamp.formatname"sv) { + hasTimestampFormatName = true; + TStringBuf formatName; + if (!ExtractSettingValue(setting.Tail(), "data.timestamp.format_name"sv, format, {}, ctx, formatName)) { + return false; + } + return NCommon::ValidateTimestampFormatName(formatName, ctx); + } + + if (name == "data.timestamp.format"sv) { + hasTimestampFormat = true; + TStringBuf unused; + if (!ExtractSettingValue(setting.Tail(), "data.timestamp.format"sv, format, {}, ctx, unused)) { + return false; + } + return true; + } + if (name == "readmaxbytes"sv) { TStringBuf unused; if (!ExtractSettingValue(setting.Tail(), "read_max_bytes"sv, format, "raw"sv, ctx, unused)) { @@ -392,7 +432,8 @@ public: }; if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), { "compression"sv, "partitionedby"sv, "projection"sv, "data.interval.unit"sv, - "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv }, validator, ctx)) + "data.datetime.formatname"sv, "data.datetime.format"sv, "data.timestamp.formatname"sv, "data.timestamp.format"sv, + "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv }, validator, ctx)) { return TStatus::Error; } @@ -400,6 +441,16 @@ public: ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Missing partitioned_by setting for projection")); return TStatus::Error; } + + if (hasDateTimeFormat && hasDateTimeFormatName) { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Don't use data.datetime.format_name and data.datetime.format together")); + return TStatus::Error; + } + + if (hasTimestampFormat && hasTimestampFormatName) { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Object::idx_Settings)->Pos()), "Don't use data.timestamp.format_name and data.timestamp.format together")); + return TStatus::Error; + } } input->SetTypeAnn(ctx.MakeType<TUnitExprType>()); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 59b8e3e3038..290bb60cbd8 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -37,6 +37,22 @@ TExprNode::TPtr GetCsvDelimiter(const TExprNode& settings) { return FindChild(settings, "csvdelimiter"sv); } +TExprNode::TPtr GetDateTimeFormatName(const TExprNode& settings) { + return FindChild(settings, "data.datetime.formatname"sv); +} + +TExprNode::TPtr GetDateTimeFormat(const TExprNode& settings) { + return FindChild(settings, "data.datetime.format"sv); +} + +TExprNode::TPtr GetTimestampFormatName(const TExprNode& settings) { + return FindChild(settings, "data.timestamp.formatname"sv); +} + +TExprNode::TPtr GetTimestampFormat(const TExprNode& settings) { + return FindChild(settings, "data.timestamp.format"sv); +} + TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) { if (partBy) { auto children = partBy->ChildrenList(); @@ -111,6 +127,44 @@ public: sinkOutputSettingsBuilder.Add(std::move(csvDelimiter)); } + bool hasDateTimeFormat = false; + bool hasDateTimeFormatName = false; + bool hasTimestampFormat = false; + bool hasTimestampFormatName = false; + if (auto dateTimeFormatName = GetDateTimeFormatName(settings)) { + sinkOutputSettingsBuilder.Add(std::move(dateTimeFormatName)); + hasDateTimeFormatName = true; + } + + if (auto dateTimeFormat = GetDateTimeFormat(settings)) { + sinkOutputSettingsBuilder.Add(std::move(dateTimeFormat)); + hasDateTimeFormat = true; + } + + if (auto timestampFormatName = GetTimestampFormatName(settings)) { + sinkOutputSettingsBuilder.Add(std::move(timestampFormatName)); + hasTimestampFormatName = true; + } + + if (auto timestampFormat = GetTimestampFormat(settings)) { + sinkOutputSettingsBuilder.Add(std::move(timestampFormat)); + hasTimestampFormat = true; + } + + if (!hasDateTimeFormat && !hasDateTimeFormatName) { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(targetNode.Pos(), "data.datetime.formatname")); + pair.push_back(ctx.NewAtom(targetNode.Pos(), "POSIX")); + sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); + } + + if (!hasTimestampFormat && !hasTimestampFormatName) { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(targetNode.Pos(), "data.timestamp.formatname")); + pair.push_back(ctx.NewAtom(targetNode.Pos(), "POSIX")); + sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); + } + if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) { YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink."; return keys.empty() ? diff --git a/ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp b/ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp index 28480e05df9..110f83b24f1 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/base/common/JSON.cpp @@ -477,7 +477,7 @@ JSON::Pos JSON::searchField(const char * data, size_t size) const { if (static_cast<int>(size) + 2 > it->dataEnd() - it->data()) continue; - if (!strncmp(data, it->data() + 1, size)) + if (!strncmp(data, it->data() + 1, size) && *(it->data() + size + 1) == '"') break; } else diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index 5d8cb4a0849..556648d6915 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -798,6 +798,31 @@ private: size_t CurrentRow = 0U;
};
+static NDB::FormatSettings::DateTimeFormat ToDateTimeFormat(const std::string& formatName) {
+ static std::map<std::string, NDB::FormatSettings::DateTimeFormat> formats{
+ {"POSIX", NDB::FormatSettings::DateTimeFormat::POSIX},
+ {"ISO", NDB::FormatSettings::DateTimeFormat::ISO}
+ };
+ if (auto it = formats.find(formatName); it != formats.end()) {
+ return it->second;
+ }
+ return NDB::FormatSettings::DateTimeFormat::Unspecified;
+}
+
+static NDB::FormatSettings::TimestampFormat ToTimestampFormat(const std::string& formatName) {
+ static std::map<std::string, NDB::FormatSettings::TimestampFormat> formats{
+ {"POSIX", NDB::FormatSettings::TimestampFormat::POSIX},
+ {"ISO", NDB::FormatSettings::TimestampFormat::ISO},
+ {"UNIX_TIME_MILLISECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeMilliseconds},
+ {"UNIX_TIME_SECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeSeconds},
+ {"UNIX_TIME_MICROSECONDS", NDB::FormatSettings::TimestampFormat::UnixTimeMicroSeconds}
+ };
+ if (auto it = formats.find(formatName); it != formats.end()) {
+ return it->second;
+ }
+ return NDB::FormatSettings::TimestampFormat::Unspecified;
+}
+
NDB::FormatSettings GetFormatSettings(const std::string_view& view) {
NDB::FormatSettings settings;
settings.skip_unknown_fields = true;
@@ -826,6 +851,26 @@ NDB::FormatSettings GetFormatSettings(const std::string_view& view) { }
settings.csv.delimiter = delimiter[0];
}
+
+ if (json.has("data.datetime.formatname")) {
+ auto formatName = json["data.datetime.formatname"].getString();
+ settings.date_time_format_name = ToDateTimeFormat(formatName);
+ }
+
+ if (json.has("data.datetime.format")) {
+ auto format = json["data.datetime.format"].getString();
+ settings.date_time_format = format;
+ }
+
+ if (json.has("data.timestamp.formatname")) {
+ auto formatName = json["data.timestamp.formatname"].getString();
+ settings.timestamp_format_name = ToTimestampFormat(formatName);
+ }
+
+ if (json.has("data.timestamp.format")) {
+ auto format = json["data.timestamp.format"].getString();
+ settings.timestamp_format = format;
+ }
}
return settings;
}
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp index fef732c0e11..c8f9c167f68 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Columns/ColumnFunction.cpp @@ -237,7 +237,7 @@ ColumnWithTypeAndName ColumnFunction::reduce() const if (is_function_compiled) ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute); - res.column = function->execute(columns, res.type, size_); + res.column = function->execute(columns, res.type, size_, {}); return res; } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h index b5d76f216ae..f8595d8217d 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Core/Field.h @@ -788,7 +788,8 @@ template <> struct Field::EnumToType<Field::Types::AggregateFunctionState> { usi inline constexpr bool isInt64OrUInt64FieldType(Field::Types::Which t) { return t == Field::Types::Int64 - || t == Field::Types::UInt64; + || t == Field::Types::UInt64 + || t == Field::Types::Decimal64; } // Field value getter with type checking in debug builds. diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp index ff30075faf5..18320c6ac54 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime.cpp @@ -19,6 +19,22 @@ namespace inline void readText(time_t & x, ReadBuffer & istr, const FormatSettings & settings, const DateLUTImpl & time_zone, const DateLUTImpl & utc_time_zone) { + if (!settings.date_time_format.empty()) { + readDateTimeTextFormat(x, istr, settings.date_time_format); + return; + } + + switch (settings.date_time_format_name) { + case FormatSettings::DateTimeFormat::Unspecified: + break; + case FormatSettings::DateTimeFormat::ISO: + readDateTimeTextISO(x, istr); + return; + case FormatSettings::DateTimeFormat::POSIX: + readDateTimeTextPOSIX(x, istr); + return; + } + switch (settings.date_time_input_format) { case FormatSettings::DateTimeInputFormat::Basic: @@ -41,6 +57,22 @@ SerializationDateTime::SerializationDateTime( void SerializationDateTime::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const { auto value = assert_cast<const ColumnType &>(column).getData()[row_num]; + if (!settings.date_time_format.empty()) { + writeDateTimeTextFormat(value, ostr, settings.date_time_format); + return; + } + + switch (settings.date_time_format_name) { + case FormatSettings::DateTimeFormat::Unspecified: + break; + case FormatSettings::DateTimeFormat::ISO: + writeDateTimeTextISO(value, ostr, utc_time_zone); + return; + case FormatSettings::DateTimeFormat::POSIX: + writeDateTimeTextPOSIX(value, ostr); + return; + } + switch (settings.date_time_output_format) { case FormatSettings::DateTimeOutputFormat::Simple: diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp index eda2e54a22c..39e63f87d04 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationDateTime64.cpp @@ -26,6 +26,31 @@ SerializationDateTime64::SerializationDateTime64( void SerializationDateTime64::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const { auto value = assert_cast<const ColumnType &>(column).getData()[row_num]; + if (!settings.timestamp_format.empty()) { + writeTimestampTextFormat(value, ostr, settings.timestamp_format); + return; + } + + switch (settings.timestamp_format_name) { + case FormatSettings::TimestampFormat::Unspecified: + break; + case FormatSettings::TimestampFormat::ISO: + writeDateTimeTextISO(value, scale, ostr, utc_time_zone); + return; + case FormatSettings::TimestampFormat::POSIX: + writeTimestampTextPOSIX(value, ostr); + return; + case FormatSettings::TimestampFormat::UnixTimeMicroSeconds: + writeIntText(value.value, ostr); + return; + case FormatSettings::TimestampFormat::UnixTimeMilliseconds: + writeIntText(value.value / 1000, ostr); + return; + case FormatSettings::TimestampFormat::UnixTimeSeconds: + writeIntText(value.value / 1000000, ostr); + return; + } + switch (settings.date_time_output_format) { case FormatSettings::DateTimeOutputFormat::Simple: @@ -59,6 +84,35 @@ void SerializationDateTime64::serializeTextEscaped(const IColumn & column, size_ static inline void readText(DateTime64 & x, UInt32 scale, ReadBuffer & istr, const FormatSettings & settings, const DateLUTImpl & time_zone, const DateLUTImpl & utc_time_zone) { + if (!settings.timestamp_format.empty()) { + readTimestampTextFormat(x, istr, settings.timestamp_format); + return; + } + + switch (settings.timestamp_format_name) { + case FormatSettings::TimestampFormat::Unspecified: + break; + case FormatSettings::TimestampFormat::ISO: + readTimestampTextISO(x, istr); + return; + case FormatSettings::TimestampFormat::POSIX: + readTimestampTextPOSIX(x, istr); + return; + case FormatSettings::TimestampFormat::UnixTimeMicroSeconds: + readIntText(x, istr); + return; + case FormatSettings::TimestampFormat::UnixTimeMilliseconds: { + readIntText(x, istr); + x *= 1000; + return; + } + case FormatSettings::TimestampFormat::UnixTimeSeconds: { + readIntText(x, istr); + x *= 1000000; + return; + } + } + switch (settings.date_time_input_format) { case FormatSettings::DateTimeInputFormat::Basic: diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h index 47f6ee7bf27..d75c241d411 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h @@ -29,6 +29,26 @@ struct FormatSettings bool import_nested_json = false; bool null_as_default = true; bool decimal_trailing_zeros = false; + String date_time_format; + String timestamp_format; + + enum class DateTimeFormat { + Unspecified, + ISO, + POSIX + }; + + enum class TimestampFormat { + Unspecified, + ISO, + POSIX, + UnixTimeMilliseconds, + UnixTimeSeconds, + UnixTimeMicroSeconds + }; + + DateTimeFormat date_time_format_name = DateTimeFormat::Unspecified; + TimestampFormat timestamp_format_name = TimestampFormat::Unspecified; enum class DateTimeInputFormat { diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h index 99f14b67984..a4073f4637a 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h @@ -2,6 +2,8 @@ #include <type_traits> +#include <ydb/library/yql/utils/backtrace/backtrace.h> + #include <IO/WriteBufferFromVector.h> #include <IO/ReadBufferFromMemory.h> #include <IO/Operators.h> @@ -883,13 +885,13 @@ struct ConvertImplGenericToString /** Conversion of strings to numbers, dates, datetimes: through parsing. */ template <typename DataType> -void parseImpl(typename DataType::FieldType & x, ReadBuffer & rb, const DateLUTImpl *) +void parseImpl(typename DataType::FieldType & x, ReadBuffer & rb, const DateLUTImpl *, const FormatSettings & format_settings) { readText(x, rb); } template <> -inline void parseImpl<DataTypeDate>(DataTypeDate::FieldType & x, ReadBuffer & rb, const DateLUTImpl *) +inline void parseImpl<DataTypeDate>(DataTypeDate::FieldType & x, ReadBuffer & rb, const DateLUTImpl *, const FormatSettings & format_settings) { DayNum tmp(0); readDateText(tmp, rb); @@ -897,7 +899,7 @@ inline void parseImpl<DataTypeDate>(DataTypeDate::FieldType & x, ReadBuffer & rb } template <> -inline void parseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuffer & rb, const DateLUTImpl *) +inline void parseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuffer & rb, const DateLUTImpl *, const FormatSettings & format_settings) { ExtendedDayNum tmp(0); readDateText(tmp, rb); @@ -906,9 +908,30 @@ inline void parseImpl<DataTypeDate32>(DataTypeDate32::FieldType & x, ReadBuffer // NOTE: no need of extra overload of DateTime64, since readDateTimeText64 has different signature and that case is explicitly handled in the calling code. template <> -inline void parseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, ReadBuffer & rb, const DateLUTImpl * time_zone) +inline void parseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, ReadBuffer & rb, const DateLUTImpl * time_zone, const FormatSettings & format_settings) { time_t time = 0; + if (!format_settings.date_time_format.empty()) { + readDateTimeTextFormat(time, rb, format_settings.date_time_format); + x = time; + return; + } + + switch (format_settings.date_time_format_name) { + case FormatSettings::DateTimeFormat::Unspecified: + break; + case FormatSettings::DateTimeFormat::ISO: { + readDateTimeTextISO(time, rb); + x = time; + return; + } + case FormatSettings::DateTimeFormat::POSIX: { + readDateTimeTextPOSIX(time, rb); + x = time; + return; + } + } + readDateTimeText(time, rb, *time_zone); if (time < 0) time = 0; @@ -917,7 +940,7 @@ inline void parseImpl<DataTypeDateTime>(DataTypeDateTime::FieldType & x, ReadBuf template <> -inline void parseImpl<DataTypeUUID>(DataTypeUUID::FieldType & x, ReadBuffer & rb, const DateLUTImpl *) +inline void parseImpl<DataTypeUUID>(DataTypeUUID::FieldType & x, ReadBuffer & rb, const DateLUTImpl *, const FormatSettings & format_settings) { UUID tmp; readUUIDText(tmp, rb); @@ -1016,6 +1039,39 @@ enum class ConvertFromStringParsingMode BestEffortUS }; +static inline void readTextTimestamp64(DateTime64 & x, UInt32 scale, ReadBuffer & istr, const DateLUTImpl & local_time_zone, const FormatSettings & settings) +{ + if (!settings.timestamp_format.empty()) { + readTimestampTextFormat(x, istr, settings.timestamp_format); + return; + } + + switch (settings.timestamp_format_name) { + case FormatSettings::TimestampFormat::Unspecified: + break; + case FormatSettings::TimestampFormat::ISO: + readTimestampTextISO(x, istr); + return; + case FormatSettings::TimestampFormat::POSIX: + readTimestampTextPOSIX(x, istr); + return; + case FormatSettings::TimestampFormat::UnixTimeMicroSeconds: + readIntText(x, istr); + return; + case FormatSettings::TimestampFormat::UnixTimeMilliseconds: { + readIntText(x, istr); + x *= 1000; + return; + } + case FormatSettings::TimestampFormat::UnixTimeSeconds: { + readIntText(x, istr); + x *= 1000000; + return; + } + } + readDateTime64Text(x, scale, istr, local_time_zone); +} + template <typename FromDataType, typename ToDataType, typename Name, ConvertFromStringExceptionMode exception_mode, ConvertFromStringParsingMode parsing_mode> struct ConvertThroughParsing @@ -1046,7 +1102,7 @@ struct ConvertThroughParsing template <typename Additions = void *> static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & res_type, size_t input_rows_count, - Additions additions [[maybe_unused]] = Additions()) + Additions additions [[maybe_unused]] = Additions(), const FormatSettings & format_settings = {}) { using ColVecTo = typename ToDataType::ColumnType; @@ -1173,7 +1229,7 @@ struct ConvertThroughParsing if constexpr (to_datetime64) { DateTime64 value = 0; - readDateTime64Text(value, vec_to.getScale(), read_buffer, *local_time_zone); + readTextTimestamp64(value, vec_to.getScale(), read_buffer, *local_time_zone, format_settings); vec_to[i] = value; } else if constexpr (IsDataTypeDecimal<ToDataType>) @@ -1181,7 +1237,7 @@ struct ConvertThroughParsing vec_to[i], read_buffer, ToDataType::maxPrecision(), vec_to.getScale()); else { - parseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone); + parseImpl<ToDataType>(vec_to[i], read_buffer, local_time_zone, format_settings); } } @@ -1601,11 +1657,11 @@ public: } bool canBeExecutedOnDefaultArguments() const override { return false; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const override { try { - return executeInternal(arguments, result_type, input_rows_count); + return executeInternal(arguments, result_type, input_rows_count, format_settings); } catch (Exception & e) { @@ -1649,7 +1705,7 @@ private: mutable bool checked_return_type = false; mutable bool to_nullable = false; - ColumnPtr executeInternal(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const + ColumnPtr executeInternal(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const { if (arguments.empty()) throw Exception{"Function " + getName() + " expects at least 1 argument", @@ -1712,9 +1768,13 @@ private: else result_column = ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>::execute(arguments, result_type, input_rows_count); } + else if constexpr (std::is_base_of_v<ConvertThroughParsing<LeftDataType, RightDataType, Name, ConvertFromStringExceptionMode::Throw, ConvertFromStringParsingMode::Normal>, ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>>) + { + result_column = ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>::execute(arguments, result_type, input_rows_count, {}, format_settings); + } else { - result_column = ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>::execute(arguments, result_type, input_rows_count); + result_column = ConvertImpl<LeftDataType, RightDataType, Name, SpecialTag>::execute(arguments, result_type, input_rows_count/*, format_settings*/); } return true; @@ -1890,30 +1950,30 @@ public: } template <typename ConvertToDataType> - ColumnPtr executeInternal(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, UInt32 scale = 0) const + ColumnPtr executeInternal(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, UInt32 scale = 0) const { const IDataType * from_type = arguments[0].type.get(); if (checkAndGetDataType<DataTypeString>(from_type)) { return ConvertThroughParsing<DataTypeString, ConvertToDataType, Name, exception_mode, parsing_mode>::execute( - arguments, result_type, input_rows_count, scale); + arguments, result_type, input_rows_count, scale, format_settings); } else if (checkAndGetDataType<DataTypeFixedString>(from_type)) { return ConvertThroughParsing<DataTypeFixedString, ConvertToDataType, Name, exception_mode, parsing_mode>::execute( - arguments, result_type, input_rows_count, scale); + arguments, result_type, input_rows_count, scale, format_settings); } return nullptr; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const override { ColumnPtr result_column; if constexpr (to_decimal) - result_column = executeInternal<ToDataType>(arguments, result_type, input_rows_count, + result_column = executeInternal<ToDataType>(arguments, result_type, input_rows_count, format_settings, assert_cast<const ToDataType &>(*removeNullable(result_type)).getScale()); else { @@ -1924,15 +1984,15 @@ public: scale = extractToDecimalScale(arguments[1]); if (scale == 0) - result_column = executeInternal<DataTypeDateTime>(arguments, result_type, input_rows_count); + result_column = executeInternal<DataTypeDateTime>(arguments, result_type, input_rows_count, format_settings); else { - result_column = executeInternal<DataTypeDateTime64>(arguments, result_type, input_rows_count, static_cast<UInt32>(scale)); + result_column = executeInternal<DataTypeDateTime64>(arguments, result_type, input_rows_count, format_settings, static_cast<UInt32>(scale)); } } else { - result_column = executeInternal<ToDataType>(arguments, result_type, input_rows_count); + result_column = executeInternal<ToDataType>(arguments, result_type, input_rows_count, format_settings); } } @@ -2391,7 +2451,7 @@ public: String getName() const override { return name; } protected: - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const override { /// drop second argument, pass others ColumnsWithTypeAndName new_arguments{arguments.front()}; @@ -2460,12 +2520,12 @@ public: const DataTypes & getArgumentTypes() const override { return argument_types; } const DataTypePtr & getResultType() const override { return return_type; } - ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & /*sample_columns*/) const override + ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & /*sample_columns*/, const FormatSettings & format_settings) const override { try { return std::make_unique<ExecutableFunctionCast>( - prepareUnpackDictionaries(getArgumentTypes()[0], getResultType()), cast_name, diagnostic); + prepareUnpackDictionaries(getArgumentTypes()[0], getResultType(), format_settings), cast_name, diagnostic); } catch (Exception & e) { @@ -2503,14 +2563,14 @@ private: std::optional<Diagnostic> diagnostic; CastType cast_type; - static WrapperType createFunctionAdaptor(FunctionPtr function, const DataTypePtr & from_type) + static WrapperType createFunctionAdaptor(FunctionPtr function, const DataTypePtr & from_type, const FormatSettings & format_settings) { auto function_adaptor = std::make_unique<FunctionToOverloadResolverAdaptor>(function)->build({ColumnWithTypeAndName{nullptr, from_type, ""}}); - return [function_adaptor] + return [function_adaptor, format_settings] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t input_rows_count) { - return function_adaptor->execute(arguments, result_type, input_rows_count); + return function_adaptor->execute(arguments, result_type, input_rows_count, format_settings); }; } @@ -2525,7 +2585,7 @@ private: } template <typename ToDataType> - WrapperType createWrapper(const DataTypePtr & from_type, const ToDataType * const to_type, bool requested_result_is_nullable) const + WrapperType createWrapper(const DataTypePtr & from_type, const ToDataType * const to_type, bool requested_result_is_nullable, const FormatSettings & format_settings) const { TypeIndex from_type_index = from_type->getTypeId(); WhichDataType which(from_type_index); @@ -2537,12 +2597,12 @@ private: /// In case when converting to Nullable type, we apply different parsing rule, /// that will not throw an exception but return NULL in case of malformed input. FunctionPtr function = FunctionConvertFromString<ToDataType, FunctionName, ConvertFromStringExceptionMode::Null>::create(); - return createFunctionAdaptor(function, from_type); + return createFunctionAdaptor(function, from_type, format_settings); } else if (!can_apply_accurate_cast) { FunctionPtr function = FunctionTo<ToDataType>::Type::create(); - return createFunctionAdaptor(function, from_type); + return createFunctionAdaptor(function, from_type, format_settings); } auto wrapper_cast_type = cast_type; @@ -2597,7 +2657,7 @@ private: static WrapperType createStringWrapper(const DataTypePtr & from_type) { FunctionPtr function = FunctionToString::create(); - return createFunctionAdaptor(function, from_type); + return createFunctionAdaptor(function, from_type, {}); } WrapperType createFixedStringWrapper(const DataTypePtr & from_type, const size_t N) const @@ -2617,7 +2677,7 @@ private: template <typename ToDataType> std::enable_if_t<IsDataTypeDecimal<ToDataType>, WrapperType> - createDecimalWrapper(const DataTypePtr & from_type, const ToDataType * to_type, bool requested_result_is_nullable) const + createDecimalWrapper(const DataTypePtr & from_type, const ToDataType * to_type, bool requested_result_is_nullable, const FormatSettings & format_settings) const { TypeIndex type_index = from_type->getTypeId(); UInt32 scale = to_type->getScale(); @@ -2636,7 +2696,7 @@ private: auto wrapper_cast_type = cast_type; - return [wrapper_cast_type, type_index, scale, to_type, requested_result_is_nullable] + return [wrapper_cast_type, type_index, scale, to_type, requested_result_is_nullable, format_settings] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *column_nullable, size_t input_rows_count) { ColumnPtr result_column; @@ -2680,8 +2740,11 @@ private: return true; } } - - result_column = ConvertImpl<LeftDataType, RightDataType, FunctionName>::execute(arguments, result_type, input_rows_count, scale); + if constexpr (std::is_base_of_v<ConvertThroughParsing<LeftDataType, RightDataType, FunctionName, ConvertFromStringExceptionMode::Throw, ConvertFromStringParsingMode::Normal>, ConvertImpl<LeftDataType, RightDataType, FunctionName>>) { + result_column = ConvertImpl<LeftDataType, RightDataType, FunctionName>::execute(arguments, result_type, input_rows_count, scale, format_settings); + } else { + result_column = ConvertImpl<LeftDataType, RightDataType, FunctionName>::execute(arguments, result_type, input_rows_count, scale); + } return true; }); @@ -2971,7 +3034,7 @@ private: else if (isNativeNumber(from_type) || isEnum(from_type)) { auto function = Function::create(); - return createFunctionAdaptor(function, from_type); + return createFunctionAdaptor(function, from_type, {}); } else { @@ -3075,7 +3138,7 @@ private: }; } - WrapperType prepareUnpackDictionaries(const DataTypePtr & from_type, const DataTypePtr & to_type) const + WrapperType prepareUnpackDictionaries(const DataTypePtr & from_type, const DataTypePtr & to_type, const FormatSettings & format_settings = {}) const { const auto * from_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(from_type.get()); const auto * to_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(to_type.get()); @@ -3108,7 +3171,7 @@ private: /// Disable check for dictionary. Will check that column doesn't contain NULL in wrapper below. skip_not_null_check = true; - auto wrapper = prepareRemoveNullable(from_nested, to_nested, skip_not_null_check); + auto wrapper = prepareRemoveNullable(from_nested, to_nested, skip_not_null_check, format_settings); if (!from_low_cardinality && !to_low_cardinality) return wrapper; @@ -3177,14 +3240,14 @@ private: }; } - WrapperType prepareRemoveNullable(const DataTypePtr & from_type, const DataTypePtr & to_type, bool skip_not_null_check) const + WrapperType prepareRemoveNullable(const DataTypePtr & from_type, const DataTypePtr & to_type, bool skip_not_null_check, const FormatSettings & format_settings) const { /// Determine whether pre-processing and/or post-processing must take place during conversion. bool source_is_nullable = from_type->isNullable(); bool result_is_nullable = to_type->isNullable(); - auto wrapper = prepareImpl(removeNullable(from_type), removeNullable(to_type), result_is_nullable); + auto wrapper = prepareImpl(removeNullable(from_type), removeNullable(to_type), result_is_nullable, format_settings); if (result_is_nullable) { @@ -3255,7 +3318,7 @@ private: /// 'from_type' and 'to_type' are nested types in case of Nullable. /// 'requested_result_is_nullable' is true if CAST to Nullable type is requested. - WrapperType prepareImpl(const DataTypePtr & from_type, const DataTypePtr & to_type, bool requested_result_is_nullable) const + WrapperType prepareImpl(const DataTypePtr & from_type, const DataTypePtr & to_type, bool requested_result_is_nullable, const FormatSettings & format_settings) const { if (from_type->equals(*to_type)) return createIdentityWrapper(from_type); @@ -3289,7 +3352,7 @@ private: std::is_same_v<ToDataType, DataTypeDateTime> || std::is_same_v<ToDataType, DataTypeUUID>) { - ret = createWrapper(from_type, checkAndGetDataType<ToDataType>(to_type.get()), requested_result_is_nullable); + ret = createWrapper(from_type, checkAndGetDataType<ToDataType>(to_type.get()), requested_result_is_nullable, format_settings); return true; } if constexpr ( @@ -3306,7 +3369,7 @@ private: std::is_same_v<ToDataType, DataTypeDecimal<Decimal256>> || std::is_same_v<ToDataType, DataTypeDateTime64>) { - ret = createDecimalWrapper(from_type, checkAndGetDataType<ToDataType>(to_type.get()), requested_result_is_nullable); + ret = createDecimalWrapper(from_type, checkAndGetDataType<ToDataType>(to_type.get()), requested_result_is_nullable, format_settings); return true; } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h index 81d77b99c12..d848c611d81 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsMiscellaneous.h @@ -35,7 +35,7 @@ public: String getName() const override { return "FunctionExpression"; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/, const FormatSettings & format_settings) const override { Block expr_columns; for (size_t i = 0; i < arguments.size(); ++i) @@ -82,7 +82,7 @@ public: const DataTypes & getArgumentTypes() const override { return argument_types; } const DataTypePtr & getResultType() const override { return return_type; } - ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override + ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &, const FormatSettings & format_settings = {}) const override { return std::make_unique<ExecutableFunctionExpression>(expression_actions, signature); } @@ -120,7 +120,7 @@ public: bool useDefaultImplementationForNulls() const override { return false; } bool useDefaultImplementationForLowCardinalityColumns() const override { return false; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count, const FormatSettings & format_settings) const override { Names names; DataTypes types; @@ -175,7 +175,7 @@ public: const DataTypes & getArgumentTypes() const override { return capture->captured_types; } const DataTypePtr & getResultType() const override { return return_type; } - ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override + ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &, const FormatSettings & format_settings = {}) const override { return std::make_unique<ExecutableFunctionCapture>(expression_actions, capture); } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp index 3f1e989db62..60fbdcfbc1b 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.cpp @@ -115,7 +115,7 @@ void convertLowCardinalityColumnsToFull(ColumnsWithTypeAndName & args) } ColumnPtr IExecutableFunction::defaultImplementationForConstantArguments( - const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const + const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const { ColumnNumbers arguments_to_remain_constants = getArgumentsThatAreAlwaysConstant(); @@ -158,7 +158,7 @@ ColumnPtr IExecutableFunction::defaultImplementationForConstantArguments( "Number of arguments for function {} doesn't match: the function requires more arguments", getName()); - ColumnPtr result_column = executeWithoutLowCardinalityColumns(temporary_columns, result_type, 1, dry_run); + ColumnPtr result_column = executeWithoutLowCardinalityColumns(temporary_columns, result_type, 1, format_settings, dry_run); /// extremely rare case, when we have function with completely const arguments /// but some of them produced by non isDeterministic function @@ -170,7 +170,7 @@ ColumnPtr IExecutableFunction::defaultImplementationForConstantArguments( ColumnPtr IExecutableFunction::defaultImplementationForNulls( - const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const + const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const { if (args.empty() || !useDefaultImplementationForNulls()) return nullptr; @@ -194,7 +194,7 @@ ColumnPtr IExecutableFunction::defaultImplementationForNulls( ColumnsWithTypeAndName temporary_columns = createBlockWithNestedColumns(args); auto temporary_result_type = removeNullable(result_type); - auto res = executeWithoutLowCardinalityColumns(temporary_columns, temporary_result_type, input_rows_count, dry_run); + auto res = executeWithoutLowCardinalityColumns(temporary_columns, temporary_result_type, input_rows_count, format_settings, dry_run); return wrapInNullable(res, args, result_type, input_rows_count); } @@ -202,19 +202,19 @@ ColumnPtr IExecutableFunction::defaultImplementationForNulls( } ColumnPtr IExecutableFunction::executeWithoutLowCardinalityColumns( - const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const + const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const { - if (auto res = defaultImplementationForConstantArguments(args, result_type, input_rows_count, dry_run)) + if (auto res = defaultImplementationForConstantArguments(args, result_type, input_rows_count, format_settings, dry_run)) return res; - if (auto res = defaultImplementationForNulls(args, result_type, input_rows_count, dry_run)) + if (auto res = defaultImplementationForNulls(args, result_type, input_rows_count, format_settings, dry_run)) return res; ColumnPtr res; if (dry_run) - res = executeDryRunImpl(args, result_type, input_rows_count); + res = executeDryRunImpl(args, result_type, input_rows_count, format_settings); else - res = executeImpl(args, result_type, input_rows_count); + res = executeImpl(args, result_type, input_rows_count, format_settings); if (!res) throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty column was returned by function {}", getName()); @@ -222,7 +222,7 @@ ColumnPtr IExecutableFunction::executeWithoutLowCardinalityColumns( return res; } -ColumnPtr IExecutableFunction::execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const +ColumnPtr IExecutableFunction::execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const { ColumnPtr result; @@ -242,7 +242,7 @@ ColumnPtr IExecutableFunction::execute(const ColumnsWithTypeAndName & arguments, ? input_rows_count : columns_without_low_cardinality.front().column->size(); - auto res = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, dictionary_type, new_input_rows_count, dry_run); + auto res = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, dictionary_type, new_input_rows_count, format_settings, dry_run); auto keys = res->convertToFullColumnIfConst(); auto res_mut_dictionary = DataTypeLowCardinality::createColumnUnique(*res_low_cardinality_type->getDictionaryType()); @@ -257,11 +257,11 @@ ColumnPtr IExecutableFunction::execute(const ColumnsWithTypeAndName & arguments, else { convertLowCardinalityColumnsToFull(columns_without_low_cardinality); - result = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, result_type, input_rows_count, dry_run); + result = executeWithoutLowCardinalityColumns(columns_without_low_cardinality, result_type, input_rows_count, format_settings, dry_run); } } else - result = executeWithoutLowCardinalityColumns(arguments, result_type, input_rows_count, dry_run); + result = executeWithoutLowCardinalityColumns(arguments, result_type, input_rows_count, format_settings, dry_run); return result; } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h index 98a71698aff..af36eda9389 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunction.h @@ -4,6 +4,7 @@ #include <Core/ColumnsWithTypeAndName.h> #include <Core/Names.h> #include <DataTypes/IDataType.h> +#include <Formats/FormatSettings.h> //#if !defined(ARCADIA_BUILD) //# include "config_core.h" @@ -46,15 +47,15 @@ public: /// Get the main function name. virtual String getName() const = 0; - ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const; + ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const; protected: - virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const = 0; + virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const = 0; - virtual ColumnPtr executeDryRunImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const + virtual ColumnPtr executeDryRunImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const { - return executeImpl(arguments, result_type, input_rows_count); + return executeImpl(arguments, result_type, input_rows_count, format_settings); } /** Default implementation in presence of Nullable arguments or NULL constants as arguments is the following: @@ -90,13 +91,13 @@ protected: private: ColumnPtr defaultImplementationForConstantArguments( - const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const; + const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const; ColumnPtr defaultImplementationForNulls( - const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const; + const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const; ColumnPtr executeWithoutLowCardinalityColumns( - const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const; + const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run) const; }; @@ -114,9 +115,9 @@ public: virtual ~IFunctionBase() = default; virtual ColumnPtr execute( - const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run = false) const + const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings, bool dry_run = false) const { - return prepare(arguments)->execute(arguments, result_type, input_rows_count, dry_run); + return prepare(arguments, format_settings)->execute(arguments, result_type, input_rows_count, format_settings, dry_run); } /// Get the main function name. @@ -127,7 +128,7 @@ public: /// Do preparations and return executable. /// sample_columns should contain data types of arguments and values of constants, if relevant. - virtual ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & arguments) const = 0; + virtual ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & arguments, const FormatSettings & format_settings = {}) const = 0; #if USE_EMBEDDED_COMPILER @@ -375,10 +376,10 @@ public: virtual String getName() const = 0; - virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const = 0; - virtual ColumnPtr executeImplDryRun(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const + virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings = {}) const = 0; + virtual ColumnPtr executeImplDryRun(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings = {}) const { - return executeImpl(arguments, result_type, input_rows_count); + return executeImpl(arguments, result_type, input_rows_count, format_settings); } /** Default implementation in presence of Nullable arguments or NULL constants as arguments is the following: diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h index a6cc0c907bf..865f4495738 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/IFunctionAdaptors.h @@ -16,14 +16,14 @@ public: protected: - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const final + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const final { - return function->executeImpl(arguments, result_type, input_rows_count); + return function->executeImpl(arguments, result_type, input_rows_count, format_settings); } - ColumnPtr executeDryRunImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const final + ColumnPtr executeDryRunImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, const FormatSettings & format_settings) const final { - return function->executeImplDryRun(arguments, result_type, input_rows_count); + return function->executeImplDryRun(arguments, result_type, input_rows_count, format_settings); } bool useDefaultImplementationForNulls() const final { return function->useDefaultImplementationForNulls(); } @@ -60,7 +60,7 @@ public: #endif - ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & /*arguments*/) const override + ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName & /*arguments*/, const FormatSettings & format_settings = {}) const override { return std::make_unique<FunctionToExecutableFunctionAdaptor>(function); } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h index 8148cf3a27a..6ecc168f8d5 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Functions/toFixedString.h @@ -60,7 +60,7 @@ public: bool useDefaultImplementationForConstants() const override { return true; } ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/, const FormatSettings & format_settings) const override { const auto n = arguments[1].column->getUInt(0); return executeForN<ConvertToFixedStringExceptionMode::Throw>(arguments, n); diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp index 016c9cb69c7..879b6f0dfe1 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.cpp @@ -10,6 +10,8 @@ #include <common/find_symbols.h> #include <stdlib.h> +#include <util/datetime/base.h> + #ifdef __SSE2__ #include <emmintrin.h> #endif @@ -1166,4 +1168,68 @@ bool loadAtPosition(ReadBuffer & in, Memory<> & memory, char * & current) return loaded_more; } +void readDateTimeTextISO(time_t & x, ReadBuffer & istr) { + for (const auto& offset: {25, 20}) { + if (istr.position() + offset <= istr.buffer().end()) { + TInstant result; + if (TInstant::TryParseIso8601(TStringBuf{istr.position(), istr.position() + offset}, result)) { + x = result.GetValue() / 1000000; + istr.position() += offset; + return; + } + } + } + throw yexception() << "error in datetime parsing. Input data: " << istr.position(); +} + +void readDateTimeTextFormat(time_t & x, ReadBuffer & istr, const String& format) +{ + struct tm input_tm; + memset(&input_tm, 0, sizeof(tm)); + input_tm.tm_mday = 1; + auto ptr = strptime(istr.position(), format.c_str(), &input_tm); + if (ptr == nullptr) { + ythrow yexception() << "Can't parse date " << istr.position() << " in " << format << " format"; + } + istr.position() = ptr; + x = TimeGM(&input_tm); +} + +void readDateTimeTextPOSIX(time_t & x, ReadBuffer & istr) +{ + readDateTimeTextFormat(x, istr, "%Y-%m-%d %H:%M:%S"); +} + +void readTimestampTextISO(DateTime64 & x, ReadBuffer & istr) { + for (const auto& offset: {27, 25, 24, 20}) { + if (istr.position() + offset <= istr.buffer().end()) { + TInstant result; + if (TInstant::TryParseIso8601(TStringBuf{istr.position(), istr.position() + offset}, result)) { + x = result.GetValue(); + istr.position() += offset; + return; + } + } + } + throw yexception() << "error in datetime parsing. Input data: " << istr.position(); +} + +void readTimestampTextFormat(DateTime64 & x, ReadBuffer & istr, const String& format) +{ + struct tm input_tm; + memset(&input_tm, 0, sizeof(tm)); + input_tm.tm_mday = 1; + auto ptr = strptime(istr.position(), format.c_str(), &input_tm); + if (ptr == nullptr) { + ythrow yexception() << "Can't parse date " << istr.position() << " in " << format << " format"; + } + istr.position() = ptr; + x = TimeGM(&input_tm) * 1000000; +} + +void readTimestampTextPOSIX(DateTime64 & x, ReadBuffer & istr) +{ + readTimestampTextFormat(x, istr, "%Y-%m-%d %H:%M:%S"); +} + } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h index e88d3d7cf52..29cf4c5af40 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h @@ -886,6 +886,18 @@ inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf) datetime.second((s[17] - '0') * 10 + (s[18] - '0')); } +void readDateTimeTextISO(time_t & x, ReadBuffer & istr); + +void readDateTimeTextFormat(time_t & x, ReadBuffer & istr, const String& format); + +void readDateTimeTextPOSIX(time_t & x, ReadBuffer & istr); + +void readTimestampTextISO(DateTime64 & x, ReadBuffer & istr); + +void readTimestampTextFormat(DateTime64 & x, ReadBuffer & istr, const String& format); + +void readTimestampTextPOSIX(DateTime64 & x, ReadBuffer & istr); + /// Generic methods to read value in native binary format. template <typename T> diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp index 5dd56030ab8..51e321f0f7a 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.cpp @@ -3,6 +3,9 @@ #include <Common/hex.h> #include <Common/StringUtils/StringUtils.h> +#include <util/datetime/base.h> +#include <util/generic/yexception.h> +#include <util/memory/tempbuf.h> namespace NDB { @@ -98,4 +101,43 @@ void writePointerHex(const void * ptr, WriteBuffer & buf) buf.write(hex_str, 2 * sizeof(ptr)); } +void writeDateTimeTextFormat(time_t datetime, WriteBuffer & buf, const String& format) +{ + struct tm tm{}; + gmtime_r(&datetime, &tm); + size_t size = Max<size_t>(format.size() * 2 + 1, 107); + TTempBuf tmp(size); + int r = strftime(tmp.Data(), tmp.Size(), format.c_str(), &tm); + if (r > 0) { + buf.write(tmp.Data(), r); + return; + } + ythrow yexception() << "Can't format date in format " << format; +} + +void writeDateTimeTextPOSIX(time_t datetime, WriteBuffer & buf) +{ + writeDateTimeTextFormat(datetime, buf, "%Y-%m-%d %H:%M:%S"); +} + +void writeTimestampTextFormat(DateTime64 datetime64, WriteBuffer & buf, const String& format) +{ + struct tm tm{}; + time_t t = datetime64 / 1000000; + gmtime_r(&t, &tm); + size_t size = Max<size_t>(format.size() * 2 + 1, 107); + TTempBuf tmp(size); + int r = strftime(tmp.Data(), tmp.Size(), format.c_str(), &tm); + if (r > 0) { + buf.write(tmp.Data(), r); + return; + } + ythrow yexception() << "Can't format date in format " << format; +} + +void writeTimestampTextPOSIX(DateTime64 datetime, WriteBuffer & buf) +{ + writeTimestampTextFormat(datetime, buf, "%Y-%m-%d %H:%M:%S"); +} + } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h index 1a15890b50f..8275fe6d4da 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h @@ -858,6 +858,14 @@ inline void writeDateTimeUnixTimestamp(DateTime64 datetime64, UInt32 scale, Writ } } +void writeDateTimeTextFormat(time_t datetime, WriteBuffer & buf, const String& format); + +void writeDateTimeTextPOSIX(time_t datetime, WriteBuffer & buf); + +void writeTimestampTextFormat(DateTime64 datetime64, WriteBuffer & buf, const String& format); + +void writeTimestampTextPOSIX(DateTime64 datetime, WriteBuffer & buf); + /// Methods for output in binary format. template <typename T> inline std::enable_if_t<is_arithmetic_v<T>, void> diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp index 6cdf67620d8..79bde3e54d0 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.cpp @@ -7,7 +7,7 @@ namespace NDB { template <CastType cast_type = CastType::nonAccurate> -static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type) +static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings) { if (arg.type->equals(*type)) return arg.column; @@ -28,27 +28,27 @@ static ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr if constexpr (cast_type == CastType::accurateOrNull) { - return func_cast->execute(arguments, makeNullable(type), arg.column->size()); + return func_cast->execute(arguments, makeNullable(type), arg.column->size(), format_settings); } else { - return func_cast->execute(arguments, type, arg.column->size()); + return func_cast->execute(arguments, type, arg.column->size(), format_settings); } } -ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type) +ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings) { - return castColumn<CastType::nonAccurate>(arg, type); + return castColumn<CastType::nonAccurate>(arg, type, format_settings); } -ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type) +ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings) { - return castColumn<CastType::accurate>(arg, type); + return castColumn<CastType::accurate>(arg, type, {}); } -ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type) +ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings) { - return castColumn<CastType::accurateOrNull>(arg, type); + return castColumn<CastType::accurateOrNull>(arg, type, {}); } } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h index 2378ba9e05e..86560e58ec5 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Interpreters/castColumn.h @@ -5,8 +5,8 @@ namespace NDB { -ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type); -ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type); -ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type); +ColumnPtr castColumn(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings); +ColumnPtr castColumnAccurate(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings); +ColumnPtr castColumnAccurateOrNull(const ColumnWithTypeAndName & arg, const DataTypePtr & type, const FormatSettings & format_settings); } diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index ad76ba146a0..474b7391203 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -515,7 +515,7 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn( { } -void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table) +void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, const FormatSettings & format_settings) { Columns columns_list; UInt64 num_rows = 0; @@ -571,7 +571,7 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr else column = readColumnFromArrowColumn(arrow_column, header_column.name, format_name, false, dictionary_values); - column.column = castColumn(column, header_column.type); + column.column = castColumn(column, header_column.type, format_settings); column.type = header_column.type; num_rows = column.column->size(); columns_list.push_back(std::move(column.column)); diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h index 5534eac8532..cfc12133980 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h @@ -27,7 +27,7 @@ public: /// data from file without knowing table structure. ArrowColumnToCHColumn(const arrow::Schema & schema, const std::string & format_name, bool import_nested_); - void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table); + void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, const FormatSettings & format_settings); private: const Block header; diff --git a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index c2d6ee1b0ea..de3bfdb8a60 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -54,7 +54,7 @@ Chunk ParquetBlockInputFormat::generate() ++row_group_current; - arrow_column_to_ch_column->arrowTableToCHChunk(res, table); + arrow_column_to_ch_column->arrowTableToCHChunk(res, table, format_settings); return res; } |