diff options
| author | a-romanov <[email protected]> | 2022-03-21 20:07:05 +0300 |
|---|---|---|
| committer | a-romanov <[email protected]> | 2022-03-21 20:07:05 +0300 |
| commit | 11ec7b3d4dc40d3821ea5e30fca3075d13d7d7c1 (patch) | |
| tree | 5c1a82943eb52b57ec438d374bc6d8a2ad9260e2 | |
| parent | e5999e6b7ca4459e45e8ccbb73da0c041009be91 (diff) | |
YQ-727 S3 Pragma for enable coro actor.
ref:6937f2c5747c0c2c4bfa89a488785c0a2eb849c4
3 files changed, 26 insertions, 25 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 3106467d79e..46be986602e 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -104,32 +104,33 @@ public: const auto token = "cluster:default_" + clusterName; YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token; -/* - return Build<TDqSourceWrap>(ctx, read->Pos()) - .Input<TS3ParseSettings>() - .Paths(s3ReadObject.Object().Paths()) - .Token<TCoSecureParam>() - .Name().Build(token) + + if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); useCoro && *useCoro && !s3ReadObject.Object().Format().Ref().IsAtom({"raw", "json_list"})) + return Build<TDqSourceWrap>(ctx, read->Pos()) + .Input<TS3ParseSettings>() + .Paths(s3ReadObject.Object().Paths()) + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Format(s3ReadObject.Object().Format()) + .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) .Build() - .Format(s3ReadObject.Object().Format()) .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) - .Build() - .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) - .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) - .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) - .Done().Ptr(); -/*/ - return Build<TDqSourceWrap>(ctx, read->Pos()) - .Input<TS3SourceSettings>() - .Paths(s3ReadObject.Object().Paths()) - .Token<TCoSecureParam>() - .Name().Build(token) + .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) + .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) + .Done().Ptr(); + else + return Build<TDqSourceWrap>(ctx, read->Pos()) + .Input<TS3SourceSettings>() + .Paths(s3ReadObject.Object().Paths()) + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() .Build() - .Build() - .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) - .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) - .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) - .Done().Ptr(); + .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) + .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) + .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) + .Done().Ptr(); } return read; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index 99c308eaf3b..2707685a266 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -8,7 +8,7 @@ using namespace NCommon; TS3Configuration::TS3Configuration() { - REGISTER_SETTING(*this, SourceActor); + REGISTER_SETTING(*this, SourceCoroActor); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 9bbcb72c87a..1d3a8b985a4 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -10,7 +10,7 @@ namespace NYql { struct TS3Settings { using TConstPtr = std::shared_ptr<const TS3Settings>; - NCommon::TConfSetting<bool, false> SourceActor; + NCommon::TConfSetting<bool, false> SourceCoroActor; }; struct TS3ClusterSettings { |
