diff options
author | uzhas <uzhas@ydb.tech> | 2022-10-26 13:43:35 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2022-10-26 13:43:35 +0300 |
commit | 2506981241ce7508b4cf18d5bc5007cdd102e18b (patch) | |
tree | 7b89a7b09ed9a4dfcf59b700abf6447fcdf52abd | |
parent | 4225eab76862f099d4d55a0205ab0cdd39c0433c (diff) | |
download | ydb-2506981241ce7508b4cf18d5bc5007cdd102e18b.tar.gz |
support custom csv delimiter in writing csv_with_names
5 files changed, 61 insertions, 19 deletions
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index e6d3a84fbf..b3741f765f 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -146,7 +146,7 @@ {"Index": 0, "Name": "Input", "Type": "TExprBase"}, {"Index": 1, "Name": "Format", "Type": "TCoAtom"}, {"Index": 2, "Name": "KeyColumns", "Type": "TCoAtomList"}, - {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + {"Index": 3, "Name": "Settings", "Type": "TExprList", "Optional": true} ] } ] diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 0753addfa5..19b1c3441a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -95,7 +95,7 @@ private: const auto& path = input->Child(TS3Target::idx_Path)->Content(); if (path.empty() || path.back() != '/') { - ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Expected non empty path to directory ends with '/'.")); + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Expected non empty path to directory ending with '/'.")); return TStatus::Error; } @@ -151,10 +151,23 @@ private: return EnsureValidUserSchemaSetting(setting, ctx); } + if (name == "csvdelimiter") { + const auto& value = setting.Tail(); + if (!EnsureAtom(value, ctx)) { + return false; + } + + if (value.Content().Size() != 1) { + ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "csv_delimiter must be single character")); + return false; + } + return true; + } + return true; }; - if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema"}, validator, ctx)) { + if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "csvdelimiter"}, validator, ctx)) { return TStatus::Error; } } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp index 3a8cab235c..9ff07d6ca3 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp @@ -17,6 +17,7 @@ TRuntimeNode BuildSerializeCall( TRuntimeNode input, const std::vector<std::string_view>& keys, const std::string_view& format, + const std::vector<std::pair<std::string_view, std::string_view>>& settings, TType* inputType, const TS3Configuration::TPtr& config, NCommon::TMkqlBuildContext& ctx) @@ -41,8 +42,8 @@ TRuntimeNode BuildSerializeCall( ); } - TString settings; - TStringOutput stream(settings); + TString settingsAsJson; + TStringOutput stream(settingsAsJson); NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig()); writer.OpenMap(); if (!keys.empty()) { @@ -91,14 +92,18 @@ TRuntimeNode BuildSerializeCall( if (const auto blockSize = config->BlockSizeMemoryLimit.Get()) writer.Write("block_size_limit", *blockSize); + for (const auto& v : settings) { + writer.Write(v.first, v.second); + } + writer.CloseMap(); writer.Flush(); - if (settings == "{}") - settings.clear(); + if (settingsAsJson == "{}") + settingsAsJson.clear(); input = ctx.ProgramBuilder.FromFlow(input); const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({input.GetStaticType()})}); - return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format + settings), {input})); + return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format + settingsAsJson), {input})); } TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, const TS3Configuration::TPtr& config, NCommon::TMkqlBuildContext& ctx) { @@ -107,7 +112,12 @@ TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, const TS3Configuration std::vector<std::string_view> keys; keys.reserve(wrapper.KeyColumns().Size()); wrapper.KeyColumns().Ref().ForEachChild([&](const TExprNode& key){ keys.emplace_back(key.Content()); }); - return BuildSerializeCall(input, keys, wrapper.Format().Value(), inputItemType, config, ctx); + std::vector<std::pair<std::string_view, std::string_view>> settings; + if (wrapper.Settings()) { + settings.reserve(wrapper.Settings().Cast().Size()); + wrapper.Settings().Cast().Ref().ForEachChild([&](const TExprNode& v){ settings.emplace_back(v.Child(0)->Content(), v.Child(1)->Content()); }); + } + return BuildSerializeCall(input, keys, wrapper.Format().Value(), settings, inputItemType, config, ctx); } } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 7558866a28..59b8e3e303 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -15,9 +15,9 @@ namespace { using namespace NNodes; using namespace NDq; -TExprNode::TPtr GetPartitionBy(const TExprNode& settings) { +TExprNode::TPtr FindChild(const TExprNode& settings, TStringBuf name) { for (auto i = 0U; i < settings.ChildrenSize(); ++i) { - if (settings.Child(i)->Head().IsAtom("partitionedby")) { + if (settings.Child(i)->Head().IsAtom(name)) { return settings.ChildPtr(i); } } @@ -25,14 +25,16 @@ TExprNode::TPtr GetPartitionBy(const TExprNode& settings) { return {}; } +TExprNode::TPtr GetPartitionBy(const TExprNode& settings) { + return FindChild(settings, "partitionedby"sv); +} + TExprNode::TPtr GetCompression(const TExprNode& settings) { - for (auto i = 0U; i < settings.ChildrenSize(); ++i) { - if (settings.Child(i)->Head().IsAtom("compression")) { - return settings.ChildPtr(i); - } - } + return FindChild(settings, "compression"sv); +} - return {}; +TExprNode::TPtr GetCsvDelimiter(const TExprNode& settings) { + return FindChild(settings, "csvdelimiter"sv); } TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) { @@ -91,18 +93,24 @@ public: const auto& targetNode = write.Target(); const auto& cluster = write.DataSink().Cluster().StringValue(); const auto token = "cluster:default_" + cluster; - auto partBy = GetPartitionBy(write.Target().Settings().Ref()); + const auto& settings = write.Target().Settings().Ref(); + auto partBy = GetPartitionBy(settings); auto keys = GetPartitionKeys(partBy); auto sinkSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos()); if (partBy) sinkSettingsBuilder.Add(std::move(partBy)); - auto compression = GetCompression(write.Target().Settings().Ref()); + auto compression = GetCompression(settings); const auto& extension = GetExtension(write.Target().Format().Value(), compression ? compression->Tail().Content() : ""sv); if (compression) sinkSettingsBuilder.Add(std::move(compression)); + auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos()); + if (auto csvDelimiter = GetCsvDelimiter(settings)) { + sinkOutputSettingsBuilder.Add(std::move(csvDelimiter)); + } + if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) { YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink."; return keys.empty() ? @@ -119,6 +127,7 @@ public: .Build() .Format(write.Target().Format()) .KeyColumns().Build() + .Settings(sinkOutputSettingsBuilder.Done()) .Build() .Build() .Outputs<TDqStageOutputsList>() @@ -167,6 +176,7 @@ public: .Input("in") .Format(write.Target().Format()) .KeyColumns().Add(keys).Build() + .Settings(sinkOutputSettingsBuilder.Done()) .Build() .Build() .Outputs<TDqStageOutputsList>() @@ -227,6 +237,7 @@ public: .Input(inputStage.Program().Body().Ptr()) .Format(write.Target().Format()) .KeyColumns().Add(std::move(keys)).Build() + .Settings(sinkOutputSettingsBuilder.Done()) .Done(); return Build<TDqQuery>(ctx, write.Pos()) @@ -259,6 +270,7 @@ public: .Input("in") .Format(write.Target().Format()) .KeyColumns().Add(std::move(keys)).Build() + .Settings(sinkOutputSettingsBuilder.Done()) .Build() .Build() .Settings().Build() diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index befb8afde8..5d8cb4a084 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -819,6 +819,13 @@ NDB::FormatSettings GetFormatSettings(const std::string_view& view) { #undef SET_FLAG
#undef SUPPORTED_FLAGS
+ if (json.has("csvdelimiter")) {
+ auto delimiter = json["csvdelimiter"].getString();
+ if (delimiter.size() != 1) {
+ throw yexception() << "CSV delimiter should contain only one symbol. Specified delimiter '" << delimiter << "' is not allowed";
+ }
+ settings.csv.delimiter = delimiter[0];
+ }
}
return settings;
}
|