diff options
author | a-romanov <[email protected]> | 2022-05-23 13:41:12 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-05-23 13:41:12 +0300 |
commit | 85313e5e687aad386991676ae4c3b21f39d399a0 (patch) | |
tree | a331596af4269451ee1d175bb51607ad63139b9b | |
parent | a559fceed98b860795ce73c9ee5d42d18bf9ec24 (diff) |
YQ-727 Settings for parse input stream under coro.
ref:27cd17d5797ba8fbdf7f0648fdd17c8be3b7b672
6 files changed, 52 insertions, 23 deletions
diff --git a/ydb/library/yql/ast/yql_expr.h b/ydb/library/yql/ast/yql_expr.h index 83d36702822..ab66d5da6cd 100644 --- a/ydb/library/yql/ast/yql_expr.h +++ b/ydb/library/yql/ast/yql_expr.h @@ -1429,7 +1429,7 @@ public: Result = std::move(result); } - bool IsCallable(const TStringBuf& name) const { + bool IsCallable(const std::string_view& name) const { ENSURE_NOT_DELETED return Type() == TExprNode::Callable && Content() == name; } @@ -1460,11 +1460,16 @@ public: return Type() == TExprNode::World; } - bool IsAtom(const TStringBuf& content) const { + bool IsAtom(const std::string_view& content) const { ENSURE_NOT_DELETED return Type() == TExprNode::Atom && Content() == content; } + bool IsAtom(const std::initializer_list<std::string_view>& names) const { + ENSURE_NOT_DELETED + return Type() == TExprNode::Atom && names.end() != std::find(names.begin(), names.end(), Content()); + } + bool IsList() const { ENSURE_NOT_DELETED return Type() == TExprNode::List; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 1847aa8760c..c94180e7c50 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -273,7 +273,7 @@ struct TReadSpec { NDB::ColumnsWithTypeAndName Columns; NDB::FormatSettings Settings; - TString Format; + TString Format, Compression; }; class TS3ReadCoroImpl : public TActorCoroImpl { @@ -643,6 +643,25 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor( } readSpec->Format = params.GetFormat(); + const auto& settings = params.GetSettings(); + if (const auto it = settings.find("compression"); settings.cend() != it) + readSpec->Compression = it->second; + +#define SUPPORTED_FLAGS(xx) \ + xx(skip_unknown_fields) \ + xx(import_nested_json) \ + xx(with_names_use_header) \ + xx(null_as_default) \ + +#define SET_FLAG(flag) \ + if (const auto it = settings.find(#flag); settings.cend() != it) \ + readSpec->Settings.flag = FromString<bool>(it->second); + + SUPPORTED_FLAGS(SET_FLAG) + +#undef SET_FLAG +#undef SUPPORTED_FLAGS + const auto actor = new TS3StreamReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), readSpec, computeActorId, retryConfig); return {actor, actor}; } else { diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 8afa27d2032..91be8e870af 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -59,7 +59,7 @@ "Children": [ {"Index": 2, "Name": "Format", "Type": "TCoAtom"}, {"Index": 3, "Name": "RowType", "Type": "TExprBase"}, - {"Index": 4, "Name": "Compression", "Type": "TCoAtom", "Optional": true} + {"Index": 4, "Name": "Settings", "Type": "TExprBase", "Optional": true} ] }, { diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index 73a89ec12d9..857ef69262a 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -14,5 +14,5 @@ message TSource { repeated TPath Path = 3; optional string RowType = 4; optional string Format = 5; - optional string Compression = 6; + map<string, string> Settings = 6; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 1894030ed5d..d72c5c916e5 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -80,8 +80,7 @@ public: return IGraphTransformer::TStatus::Error; } - if (input->ChildrenSize() > TS3ParseSettings::idx_Compression && !EnsureAtom(*input->Child(TS3ParseSettings::idx_Compression), ctx)) { - ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3ParseSettings::idx_Compression)->Pos()), "Expected compression atom.")); + if (input->ChildrenSize() > TS3ParseSettings::idx_Settings && !EnsureTuple(*input->Child(TS3ParseSettings::idx_Settings), ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 46be986602e..4ae8de33b46 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -84,6 +84,9 @@ public: const auto rowType = s3ReadObject.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>()->GetItemType(); const auto& clusterName = s3ReadObject.DataSource().Cluster().StringValue(); + const auto token = "cluster:default_" + clusterName; + YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token; + TExprNode::TListType settings(1U, ctx.Builder(s3ReadObject.Object().Pos()) .List() @@ -92,19 +95,6 @@ public: .Seal().Build() ); - if (const auto& objectSettings = s3ReadObject.Object().Settings()) { - settings.emplace_back( - ctx.Builder(objectSettings.Cast().Pos()) - .List() - .Atom(0, "settings", TNodeFlags::Default) - .Add(1, objectSettings.Cast().Ptr()) - .Seal().Build() - ); - } - - const auto token = "cluster:default_" + clusterName; - YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token; - if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); useCoro && *useCoro && !s3ReadObject.Object().Format().Ref().IsAtom({"raw", "json_list"})) return Build<TDqSourceWrap>(ctx, read->Pos()) .Input<TS3ParseSettings>() @@ -114,12 +104,23 @@ public: .Build() .Format(s3ReadObject.Object().Format()) .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) + .Settings(s3ReadObject.Object().Settings()) .Build() .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) .Done().Ptr(); - else + else { + if (const auto& objectSettings = s3ReadObject.Object().Settings()) { + settings.emplace_back( + ctx.Builder(objectSettings.Cast().Pos()) + .List() + .Atom(0, "settings", TNodeFlags::Default) + .Add(1, objectSettings.Cast().Ptr()) + .Seal().Build() + ); + } + return Build<TDqSourceWrap>(ctx, read->Pos()) .Input<TS3SourceSettings>() .Paths(s3ReadObject.Object().Paths()) @@ -131,6 +132,7 @@ public: .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) .Done().Ptr(); + } } return read; } @@ -158,8 +160,12 @@ public: srcDesc.SetFormat(parseSettings.Format().StringValue().c_str()); srcDesc.SetRowType(NCommon::WriteTypeToYson(parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), NYT::NYson::EYsonFormat::Text)); - if (const auto compression = parseSettings.Compression()) - srcDesc.SetCompression(compression.Cast().StringValue().c_str()); + if (const auto maySettings = parseSettings.Settings()) { + const auto& settings = maySettings.Cast(); + for (auto i = 0U; i < settings.Ref().ChildrenSize(); ++i) { + srcDesc.MutableSettings()->insert({TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().Head().Content())}); + } + } } protoSettings.PackFrom(srcDesc); |