summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-05-23 13:41:12 +0300
committera-romanov <[email protected]>2022-05-23 13:41:12 +0300
commit85313e5e687aad386991676ae4c3b21f39d399a0 (patch)
treea331596af4269451ee1d175bb51607ad63139b9b
parenta559fceed98b860795ce73c9ee5d42d18bf9ec24 (diff)
YQ-727 Settings for parse input stream under coro.
ref:27cd17d5797ba8fbdf7f0648fdd17c8be3b7b672
-rw-r--r--ydb/library/yql/ast/yql_expr.h9
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp21
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json2
-rw-r--r--ydb/library/yql/providers/s3/proto/source.proto2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp38
6 files changed, 52 insertions, 23 deletions
diff --git a/ydb/library/yql/ast/yql_expr.h b/ydb/library/yql/ast/yql_expr.h
index 83d36702822..ab66d5da6cd 100644
--- a/ydb/library/yql/ast/yql_expr.h
+++ b/ydb/library/yql/ast/yql_expr.h
@@ -1429,7 +1429,7 @@ public:
Result = std::move(result);
}
- bool IsCallable(const TStringBuf& name) const {
+ bool IsCallable(const std::string_view& name) const {
ENSURE_NOT_DELETED
return Type() == TExprNode::Callable && Content() == name;
}
@@ -1460,11 +1460,16 @@ public:
return Type() == TExprNode::World;
}
- bool IsAtom(const TStringBuf& content) const {
+ bool IsAtom(const std::string_view& content) const {
ENSURE_NOT_DELETED
return Type() == TExprNode::Atom && Content() == content;
}
+ bool IsAtom(const std::initializer_list<std::string_view>& names) const {
+ ENSURE_NOT_DELETED
+ return Type() == TExprNode::Atom && names.end() != std::find(names.begin(), names.end(), Content());
+ }
+
bool IsList() const {
ENSURE_NOT_DELETED
return Type() == TExprNode::List;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 1847aa8760c..c94180e7c50 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -273,7 +273,7 @@ struct TReadSpec {
NDB::ColumnsWithTypeAndName Columns;
NDB::FormatSettings Settings;
- TString Format;
+ TString Format, Compression;
};
class TS3ReadCoroImpl : public TActorCoroImpl {
@@ -643,6 +643,25 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
}
readSpec->Format = params.GetFormat();
+ const auto& settings = params.GetSettings();
+ if (const auto it = settings.find("compression"); settings.cend() != it)
+ readSpec->Compression = it->second;
+
+#define SUPPORTED_FLAGS(xx) \
+ xx(skip_unknown_fields) \
+ xx(import_nested_json) \
+ xx(with_names_use_header) \
+ xx(null_as_default) \
+
+#define SET_FLAG(flag) \
+ if (const auto it = settings.find(#flag); settings.cend() != it) \
+ readSpec->Settings.flag = FromString<bool>(it->second);
+
+ SUPPORTED_FLAGS(SET_FLAG)
+
+#undef SET_FLAG
+#undef SUPPORTED_FLAGS
+
const auto actor = new TS3StreamReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), readSpec, computeActorId, retryConfig);
return {actor, actor};
} else {
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 8afa27d2032..91be8e870af 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -59,7 +59,7 @@
"Children": [
{"Index": 2, "Name": "Format", "Type": "TCoAtom"},
{"Index": 3, "Name": "RowType", "Type": "TExprBase"},
- {"Index": 4, "Name": "Compression", "Type": "TCoAtom", "Optional": true}
+ {"Index": 4, "Name": "Settings", "Type": "TExprBase", "Optional": true}
]
},
{
diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto
index 73a89ec12d9..857ef69262a 100644
--- a/ydb/library/yql/providers/s3/proto/source.proto
+++ b/ydb/library/yql/providers/s3/proto/source.proto
@@ -14,5 +14,5 @@ message TSource {
repeated TPath Path = 3;
optional string RowType = 4;
optional string Format = 5;
- optional string Compression = 6;
+ map<string, string> Settings = 6;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 1894030ed5d..d72c5c916e5 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -80,8 +80,7 @@ public:
return IGraphTransformer::TStatus::Error;
}
- if (input->ChildrenSize() > TS3ParseSettings::idx_Compression && !EnsureAtom(*input->Child(TS3ParseSettings::idx_Compression), ctx)) {
- ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3ParseSettings::idx_Compression)->Pos()), "Expected compression atom."));
+ if (input->ChildrenSize() > TS3ParseSettings::idx_Settings && !EnsureTuple(*input->Child(TS3ParseSettings::idx_Settings), ctx)) {
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 46be986602e..4ae8de33b46 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -84,6 +84,9 @@ public:
const auto rowType = s3ReadObject.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>()->GetItemType();
const auto& clusterName = s3ReadObject.DataSource().Cluster().StringValue();
+ const auto token = "cluster:default_" + clusterName;
+ YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token;
+
TExprNode::TListType settings(1U,
ctx.Builder(s3ReadObject.Object().Pos())
.List()
@@ -92,19 +95,6 @@ public:
.Seal().Build()
);
- if (const auto& objectSettings = s3ReadObject.Object().Settings()) {
- settings.emplace_back(
- ctx.Builder(objectSettings.Cast().Pos())
- .List()
- .Atom(0, "settings", TNodeFlags::Default)
- .Add(1, objectSettings.Cast().Ptr())
- .Seal().Build()
- );
- }
-
- const auto token = "cluster:default_" + clusterName;
- YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token;
-
if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); useCoro && *useCoro && !s3ReadObject.Object().Format().Ref().IsAtom({"raw", "json_list"}))
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3ParseSettings>()
@@ -114,12 +104,23 @@ public:
.Build()
.Format(s3ReadObject.Object().Format())
.RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
+ .Settings(s3ReadObject.Object().Settings())
.Build()
.RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
.DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
.Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
.Done().Ptr();
- else
+ else {
+ if (const auto& objectSettings = s3ReadObject.Object().Settings()) {
+ settings.emplace_back(
+ ctx.Builder(objectSettings.Cast().Pos())
+ .List()
+ .Atom(0, "settings", TNodeFlags::Default)
+ .Add(1, objectSettings.Cast().Ptr())
+ .Seal().Build()
+ );
+ }
+
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3SourceSettings>()
.Paths(s3ReadObject.Object().Paths())
@@ -131,6 +132,7 @@ public:
.DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
.Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
.Done().Ptr();
+ }
}
return read;
}
@@ -158,8 +160,12 @@ public:
srcDesc.SetFormat(parseSettings.Format().StringValue().c_str());
srcDesc.SetRowType(NCommon::WriteTypeToYson(parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), NYT::NYson::EYsonFormat::Text));
- if (const auto compression = parseSettings.Compression())
- srcDesc.SetCompression(compression.Cast().StringValue().c_str());
+ if (const auto maySettings = parseSettings.Settings()) {
+ const auto& settings = maySettings.Cast();
+ for (auto i = 0U; i < settings.Ref().ChildrenSize(); ++i) {
+ srcDesc.MutableSettings()->insert({TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().Head().Content())});
+ }
+ }
}
protoSettings.PackFrom(srcDesc);