diff options
author | hor911 <hor911@ydb.tech> | 2023-03-06 23:31:20 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-03-06 23:31:20 +0300 |
commit | e61ae9ba657c9b199ec24061043a5426aed28ecc (patch) | |
tree | 983e283e67a47714ad51a7f3a8e80f12b3e08ea4 | |
parent | fccb745feb0ac047fb767a1d9829c7b48d8d8f48 (diff) | |
download | ydb-e61ae9ba657c9b199ec24061043a5426aed28ecc.tar.gz |
Refactor s3.ArrowThreadPool and introduce s3.ArrowReadAheadRowGroupCount options
7 files changed, 9 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 7a8e65a476..06b69e4d1e 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -77,11 +77,6 @@ "Match": {"Type": "Callable", "Name": "S3ArrowSettings"} }, { - "Name": "TS3CoroArrowSettings", - "Base": "TS3ParseSettingsBase", - "Match": {"Type": "Callable", "Name": "S3CoroArrowSettings"} - }, - { "Name": "TS3Read", "Base": "TFreeArgCallable", "Match": {"Type": "Callable", "Name": "Read!"}, diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index c174ce2cd9..bf379c1b99 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -17,4 +17,5 @@ message TSource { map<string, string> Settings = 6; bool Arrow = 7; bool ThreadPool = 8; + uint64 ReadAheadRowGroupCount = 9; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 00aeccd4cc..0e1c9a218c 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -111,7 +111,6 @@ public: AddHandler({TS3SourceSettings::CallableName()}, Hndl(&TSelf::HandleS3SourceSettings)); AddHandler({TS3ParseSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase)); AddHandler({TS3ArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase)); - AddHandler({TS3CoroArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase)); AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig)); } @@ -178,7 +177,7 @@ public: } const TTypeAnnotationNode* itemType = nullptr; - if (input->Content() == TS3ArrowSettings::CallableName() || input->Content() == TS3CoroArrowSettings::CallableName()) { + if (input->Content() == TS3ArrowSettings::CallableName()) { TVector<const TItemExprType*> blockRowTypeItems; for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) { blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType()))); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 3a21c6929a..42c2b77e60 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -194,8 +194,7 @@ public: } return Build<TDqSourceWrap>(ctx, read->Pos()) .Input<TS3ParseSettingsBase>() - .CallableName((supportedArrowTypes && format == "parquet") ? - (State_->Configuration->ArrowThreadPool.Get().GetOrElse(true) ? TS3ArrowSettings::CallableName() : TS3CoroArrowSettings::CallableName() ): + .CallableName((supportedArrowTypes && format == "parquet") ? TS3ArrowSettings::CallableName(): TS3ParseSettings::CallableName()) .Paths(s3ReadObject.Object().Paths()) .Token<TCoSecureParam>() @@ -277,8 +276,9 @@ public: if (const auto mayParseSettings = settings.Maybe<TS3ParseSettingsBase>()) { const auto parseSettings = mayParseSettings.Cast(); srcDesc.SetFormat(parseSettings.Format().StringValue().c_str()); - srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()) || bool(parseSettings.Maybe<TS3CoroArrowSettings>())); - srcDesc.SetThreadPool(bool(parseSettings.Maybe<TS3ArrowSettings>())); + srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>())); + srcDesc.SetThreadPool(State_->Configuration->ArrowThreadPool.Get().GetOrElse(true)); + srcDesc.SetReadAheadRowGroupCount(State_->Configuration->ArrowReadAheadRowGroupCount.Get().GetOrElse(0)); const TStructExprType* fullRowType = parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); // exclude extra columns to get actual row type we need to read from input diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index 997090570f..bd3e92d89a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -514,7 +514,7 @@ public: } } - if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !parseSettingsBase.Maybe<TS3ArrowSettings>() && !parseSettingsBase.Maybe<TS3CoroArrowSettings>()) { + if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !parseSettingsBase.Maybe<TS3ArrowSettings>()) { const TStructExprType* readRowDataType = ctx.MakeType<TStructExprType>(readRowDataItems); auto item = GetLightColumn(*readRowDataType); YQL_ENSURE(item); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index a60dfa7639..fe48fc0be0 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -16,6 +16,7 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, InFlightMemoryLimit); REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000); REGISTER_SETTING(*this, ArrowThreadPool); + REGISTER_SETTING(*this, ArrowReadAheadRowGroupCount); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index e2f5caa3cd..53e8c3a7e4 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -18,6 +18,7 @@ struct TS3Settings { NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink. NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000. NCommon::TConfSetting<bool, false> ArrowThreadPool; + NCommon::TConfSetting<ui64, false> ArrowReadAheadRowGroupCount; }; struct TS3ClusterSettings { |