aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <uzhas@ydb.tech>2022-10-26 13:43:35 +0300
committeruzhas <uzhas@ydb.tech>2022-10-26 13:43:35 +0300
commit2506981241ce7508b4cf18d5bc5007cdd102e18b (patch)
tree7b89a7b09ed9a4dfcf59b700abf6447fcdf52abd
parent4225eab76862f099d4d55a0205ab0cdd39c0433c (diff)
downloadydb-2506981241ce7508b4cf18d5bc5007cdd102e18b.tar.gz
support custom csv delimiter in writing csv_with_names
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp17
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp22
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp32
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp7
5 files changed, 61 insertions, 19 deletions
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index e6d3a84fbf..b3741f765f 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -146,7 +146,7 @@
{"Index": 0, "Name": "Input", "Type": "TExprBase"},
{"Index": 1, "Name": "Format", "Type": "TCoAtom"},
{"Index": 2, "Name": "KeyColumns", "Type": "TCoAtomList"},
- {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
+ {"Index": 3, "Name": "Settings", "Type": "TExprList", "Optional": true}
]
}
]
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 0753addfa5..19b1c3441a 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -95,7 +95,7 @@ private:
const auto& path = input->Child(TS3Target::idx_Path)->Content();
if (path.empty() || path.back() != '/') {
- ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Expected non empty path to directory ends with '/'."));
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Expected non empty path to directory ending with '/'."));
return TStatus::Error;
}
@@ -151,10 +151,23 @@ private:
return EnsureValidUserSchemaSetting(setting, ctx);
}
+ if (name == "csvdelimiter") {
+ const auto& value = setting.Tail();
+ if (!EnsureAtom(value, ctx)) {
+ return false;
+ }
+
+ if (value.Content().Size() != 1) {
+ ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "csv_delimiter must be single character"));
+ return false;
+ }
+ return true;
+ }
+
return true;
};
- if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema"}, validator, ctx)) {
+ if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "csvdelimiter"}, validator, ctx)) {
return TStatus::Error;
}
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
index 3a8cab235c..9ff07d6ca3 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
@@ -17,6 +17,7 @@ TRuntimeNode BuildSerializeCall(
TRuntimeNode input,
const std::vector<std::string_view>& keys,
const std::string_view& format,
+ const std::vector<std::pair<std::string_view, std::string_view>>& settings,
TType* inputType,
const TS3Configuration::TPtr& config,
NCommon::TMkqlBuildContext& ctx)
@@ -41,8 +42,8 @@ TRuntimeNode BuildSerializeCall(
);
}
- TString settings;
- TStringOutput stream(settings);
+ TString settingsAsJson;
+ TStringOutput stream(settingsAsJson);
NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig());
writer.OpenMap();
if (!keys.empty()) {
@@ -91,14 +92,18 @@ TRuntimeNode BuildSerializeCall(
if (const auto blockSize = config->BlockSizeMemoryLimit.Get())
writer.Write("block_size_limit", *blockSize);
+ for (const auto& v : settings) {
+ writer.Write(v.first, v.second);
+ }
+
writer.CloseMap();
writer.Flush();
- if (settings == "{}")
- settings.clear();
+ if (settingsAsJson == "{}")
+ settingsAsJson.clear();
input = ctx.ProgramBuilder.FromFlow(input);
const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({input.GetStaticType()})});
- return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format + settings), {input}));
+ return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format + settingsAsJson), {input}));
}
TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, const TS3Configuration::TPtr& config, NCommon::TMkqlBuildContext& ctx) {
@@ -107,7 +112,12 @@ TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, const TS3Configuration
std::vector<std::string_view> keys;
keys.reserve(wrapper.KeyColumns().Size());
wrapper.KeyColumns().Ref().ForEachChild([&](const TExprNode& key){ keys.emplace_back(key.Content()); });
- return BuildSerializeCall(input, keys, wrapper.Format().Value(), inputItemType, config, ctx);
+ std::vector<std::pair<std::string_view, std::string_view>> settings;
+ if (wrapper.Settings()) {
+ settings.reserve(wrapper.Settings().Cast().Size());
+ wrapper.Settings().Cast().Ref().ForEachChild([&](const TExprNode& v){ settings.emplace_back(v.Child(0)->Content(), v.Child(1)->Content()); });
+ }
+ return BuildSerializeCall(input, keys, wrapper.Format().Value(), settings, inputItemType, config, ctx);
}
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index 7558866a28..59b8e3e303 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -15,9 +15,9 @@ namespace {
using namespace NNodes;
using namespace NDq;
-TExprNode::TPtr GetPartitionBy(const TExprNode& settings) {
+TExprNode::TPtr FindChild(const TExprNode& settings, TStringBuf name) {
for (auto i = 0U; i < settings.ChildrenSize(); ++i) {
- if (settings.Child(i)->Head().IsAtom("partitionedby")) {
+ if (settings.Child(i)->Head().IsAtom(name)) {
return settings.ChildPtr(i);
}
}
@@ -25,14 +25,16 @@ TExprNode::TPtr GetPartitionBy(const TExprNode& settings) {
return {};
}
+TExprNode::TPtr GetPartitionBy(const TExprNode& settings) {
+ return FindChild(settings, "partitionedby"sv);
+}
+
TExprNode::TPtr GetCompression(const TExprNode& settings) {
- for (auto i = 0U; i < settings.ChildrenSize(); ++i) {
- if (settings.Child(i)->Head().IsAtom("compression")) {
- return settings.ChildPtr(i);
- }
- }
+ return FindChild(settings, "compression"sv);
+}
- return {};
+TExprNode::TPtr GetCsvDelimiter(const TExprNode& settings) {
+ return FindChild(settings, "csvdelimiter"sv);
}
TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
@@ -91,18 +93,24 @@ public:
const auto& targetNode = write.Target();
const auto& cluster = write.DataSink().Cluster().StringValue();
const auto token = "cluster:default_" + cluster;
- auto partBy = GetPartitionBy(write.Target().Settings().Ref());
+ const auto& settings = write.Target().Settings().Ref();
+ auto partBy = GetPartitionBy(settings);
auto keys = GetPartitionKeys(partBy);
auto sinkSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos());
if (partBy)
sinkSettingsBuilder.Add(std::move(partBy));
- auto compression = GetCompression(write.Target().Settings().Ref());
+ auto compression = GetCompression(settings);
const auto& extension = GetExtension(write.Target().Format().Value(), compression ? compression->Tail().Content() : ""sv);
if (compression)
sinkSettingsBuilder.Add(std::move(compression));
+ auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos());
+ if (auto csvDelimiter = GetCsvDelimiter(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(csvDelimiter));
+ }
+
if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink.";
return keys.empty() ?
@@ -119,6 +127,7 @@ public:
.Build()
.Format(write.Target().Format())
.KeyColumns().Build()
+ .Settings(sinkOutputSettingsBuilder.Done())
.Build()
.Build()
.Outputs<TDqStageOutputsList>()
@@ -167,6 +176,7 @@ public:
.Input("in")
.Format(write.Target().Format())
.KeyColumns().Add(keys).Build()
+ .Settings(sinkOutputSettingsBuilder.Done())
.Build()
.Build()
.Outputs<TDqStageOutputsList>()
@@ -227,6 +237,7 @@ public:
.Input(inputStage.Program().Body().Ptr())
.Format(write.Target().Format())
.KeyColumns().Add(std::move(keys)).Build()
+ .Settings(sinkOutputSettingsBuilder.Done())
.Done();
return Build<TDqQuery>(ctx, write.Pos())
@@ -259,6 +270,7 @@ public:
.Input("in")
.Format(write.Target().Format())
.KeyColumns().Add(std::move(keys)).Build()
+ .Settings(sinkOutputSettingsBuilder.Done())
.Build()
.Build()
.Settings().Build()
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
index befb8afde8..5d8cb4a084 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
@@ -819,6 +819,13 @@ NDB::FormatSettings GetFormatSettings(const std::string_view& view) {
#undef SET_FLAG
#undef SUPPORTED_FLAGS
+ if (json.has("csvdelimiter")) {
+ auto delimiter = json["csvdelimiter"].getString();
+ if (delimiter.size() != 1) {
+ throw yexception() << "CSV delimiter should contain only one symbol. Specified delimiter '" << delimiter << "' is not allowed";
+ }
+ settings.csv.delimiter = delimiter[0];
+ }
}
return settings;
}